Following system colour scheme Selected dark colour scheme Selected light colour scheme

Python 增强提案

PEP 3156 – 异步 IO 支持重启: “asyncio” 模块

作者:
Guido van Rossum <guido at python.org>
BDFL-代表:
Antoine Pitrou <antoine at python.org>
讨论对象:
python-tulip@googlegroups.com
状态:
最终
类型:
标准跟踪
创建时间:
2012-12-12
Python 版本:
3.3
历史记录:
2012-12-21
替换:
3153
解决方案:
Python-Dev 消息

目录

摘要

这是 Python 3 中异步 I/O 的提案,从 Python 3.3 开始。将此视为 PEP 3153 中缺少的具体提案。该提案包含可插拔的事件循环、传输和协议抽象,类似于 Twisted 中的抽象,以及基于 yield from (PEP 380) 的更高级调度程序。建议的包名为 asyncio

简介

状态

一个参考实现存在于代码名称 Tulip 下。Tulip 仓库链接在最后的参考文献部分。基于此仓库的包将提供在 PyPI 上(见参考文献),以使用 asyncio 包与 Python 3.3 安装一起使用。

截至 2013 年 10 月 20 日,asyncio 包已签入 Python 3.4 存储库,并与 Python 3.4-alpha-4 一起发布,具有“临时”API 状态。这是对信心的表达,旨在增加对 API 的早期反馈,而不是强迫接受 PEP。预期该包将在 Python 3.4 中保持临时状态,并在 Python 3.5 中进展到最终状态。开发继续主要在 Tulip 仓库中进行,偶尔会将更改合并到 CPython 仓库中。

依赖

Python 3.3 是许多提议功能所必需的。参考实现(Tulip)除了 Python 3.3 之外不需要任何新的语言或标准库功能,不需要任何第三方模块或包,也不需要任何 C 代码,除了 Windows 上的(可选)IOCP 支持。

模块命名空间

此处的规范位于一个新的顶级包 asyncio 中。不同的组件位于包的独立子模块中。该包将从其各自的子模块导入常见的 API,并将它们作为包属性提供(类似于电子邮件包的工作方式)。对于此类常见的 API,实际定义它们的子模块名称不是规范的一部分。不太常见的 API 可能需要显式地从其各自的子模块导入,在这种情况下,子模块名称是规范的一部分。

在没有子模块名称的情况下定义的类和函数假定位于顶级包的命名空间中。(但不要将这些与各种类的​​方法混淆,这些方法出于简洁考虑,在某些情况下也无需命名空间前缀。)

互操作性

事件循环是大多数互操作性发生的地方。对于(Python 3.3 端口)像 Twisted、Tornado 甚至 gevents 这样的框架,应该很容易通过使用轻量级适配器或代理来适应默认的事件循环实现以满足其需求,或者用他们自己的事件循环实现的改编来替换默认的事件循环实现。(一些框架,如 Twisted,有多个事件循环实现。这应该不是问题,因为它们都有相同的接口。)

在大多数情况下,两个不同的第三方框架应该能够相互操作,要么通过共享默认的事件循环实现(每个框架使用自己的适配器),要么通过共享任一框架的事件循环实现。在后一种情况下,将发生两级适配(从框架 A 的事件循环到标准事件循环接口,再从那里到框架 B 的事件循环)。使用哪个事件循环实现应该由主程序控制(尽管提供了一个用于事件循环选择的默认策略)。

为了使这种互操作性有效,第三方框架中首选的适配方向是保留默认的事件循环并将其适应框架的 API。理想情况下,所有第三方框架都会放弃自己的事件循环实现,转而使用标准实现。但并非所有框架都能满足标准实现提供的功能。

为了支持双向适配,指定了两个独立的 API

  • 用于管理当前事件循环的接口
  • 符合的事件循环的接口

事件循环实现可以提供其他方法和保证,只要这些方法在文档中被调用为非标准方法。事件循环实现也可以省略某些方法,如果它们无法在给定的环境中实现;但是,此类与标准 API 的偏差应仅被视为最后的手段,并且仅当平台或环境强制执行时。(一个例子是在存在无法启动或停止的系统事件循环的平台上;参见下面的“嵌入式事件循环”。)

事件循环 API 不依赖于 await/yield from。相反,它使用回调、附加接口(传输和协议)以及期货的组合。后者类似于 PEP 3148 中定义的期货,但具有不同的实现,并且不与线程绑定。特别是,result() 方法在结果尚未准备好时会抛出异常,而不是阻塞;预计用户将使用回调(或 await/yield from)等待结果。

对于所有指定为返回协程的事件循环方法,允许它们返回期货或协程,由实现选择(标准实现始终返回协程)。所有记录为接受协程参数的事件循环方法必须接受此类参数的期货和协程。(一个便利函数,ensure_future(),存在于将协程或期货参数转换为期货。)

对于不喜欢使用回调的用户(像我一样),提供了一个调度程序,用于使用 PEP 380 yield fromPEP 492 await 表达式将异步 I/O 代码编写为协程。调度程序不可插拔;可插拔性发生在事件循环级别,标准调度程序实现应该与任何符合的事件循环实现一起工作。(事实上,这是一个重要的检验符合实现的标准。)

为了在使用协程编写的代码和其他异步框架之间进行互操作,调度程序定义了一个 Task 类,其行为类似于期货。与事件循环级别进行互操作的框架可以通过向期货添加回调来等待期货完成。同样,调度程序提供了一个操作来挂起协程,直到回调被调用。

如果此类框架无法按原样使用期货和 Task 类,则它可以重新实现 loop.create_future()loop.create_task() 方法。这些方法应该返回实现(期货/Task 接口的超集)的对象。

一个不太雄心勃勃的框架可能只会调用 loop.set_task_factory() 来替换 Task 类,而无需实现自己的事件循环。

事件循环 API 为线程提供了有限的互操作性:有一个 API 可以将函数提交给执行器(参见 PEP 3148),该执行器返回一个与事件循环兼容的期货,并且有一个方法可以以线程安全的方式从另一个线程调度事件循环的回调。

传输和协议

对于不熟悉 Twisted 的人,有必要简要解释一下传输和协议之间的关系。在最高级别,传输关注字节如何传输,而协议确定字节哪些传输(以及在某种程度上何时传输)。

另一种说法是:传输是套接字(或类似的 I/O 端点)的抽象,而协议是从传输的角度来看应用程序的抽象。

另一种看法是,传输和协议接口一起定义了使用网络 I/O 和进程间 I/O 的抽象接口。

几乎总是存在传输和协议对象之间的 1:1 关系:协议调用传输方法来发送数据,而传输调用协议方法来传递已接收的数据。传输或协议方法都不会“阻塞”——它们会启动事件,然后返回。

最常见的传输类型是双向流传输。它代表一对缓冲流(每个方向一个),每个流传输一个字节序列。最常见的双向流传输示例可能是 TCP 连接。另一个常见示例是 SSL/TLS 连接。但也有一些其他事物可以这样看待,例如 SSH 会话或一对 UNIX 管道。通常情况下,传输实现并不多,而且大多数传输实现都附带事件循环实现。但是,并非所有传输都必须通过调用事件循环方法来创建:第三方模块可以实现新的传输,并为其提供构造函数或工厂函数,这些函数只需要将事件循环作为参数或调用 get_event_loop()

注意,传输不需要使用套接字,即使它们使用 TCP 也是如此——套接字是平台特定的实现细节。

双向流传输有两个“端点”:一个端点与网络(或另一个进程,或它包装的任何低级接口)通信,另一个端点与协议通信。前者使用任何必要的 API 来实现传输;但传输和协议之间的接口由本 PEP 标准化。

协议可以表示某种“应用程序级”协议,例如 HTTP 或 SMTP;它也可以实现由多个协议共享的抽象,或整个应用程序。协议的主要接口是与传输的接口。虽然一些流行的协议(和其他抽象)可能具有标准实现,但应用程序通常会实现自定义协议。拥有可从 PyPI 下载和安装的有用第三方协议实现库也很有意义。

传输和协议的通用概念包括其他接口,其中传输包装了一些其他通信抽象。例如,用于发送和接收数据报的接口(例如 UDP),或子进程管理器。关注点的分离与双向流传输和协议相同,但传输和协议之间的特定接口在每种情况下都不同。

各种标准类型的传输和协议定义的接口的详细信息将在后面给出。

事件循环接口规范

事件循环策略:获取和设置当前事件循环

事件循环管理由事件循环策略控制,事件循环策略是一个全局(每个进程)对象。存在默认策略,以及更改策略的 API。策略定义了上下文的概念;策略为每个上下文管理一个独立的事件循环。默认策略的上下文概念定义为当前线程。

某些平台或编程框架可能会将默认策略更改为更适合该平台或框架用户期望的策略。此类平台或框架必须记录其策略以及在初始化序列中的哪个点设置策略,以避免在多个活动框架想要覆盖默认策略时出现未定义的行为。(另请参阅下面的“嵌入式事件循环”。)

要获取当前上下文的事件循环,请使用 get_event_loop()。这将返回一个实现下面指定的接口的事件循环对象,或者如果当前上下文没有设置事件循环且当前策略未指定创建事件循环,则会引发异常。它不应该返回 None

要为当前上下文设置事件循环,请使用 set_event_loop(event_loop),其中 event_loop 是一个事件循环对象,即 AbstractEventLoop 的实例,或者 None。将当前事件循环设置为 None 是可以的,在这种情况下,后续对 get_event_loop() 的调用将引发异常。这对于测试不应该依赖于默认事件循环存在的代码很有用。

预期 get_event_loop() 返回一个不同的事件循环对象,具体取决于上下文(实际上,这就是上下文的定义)。如果未设置事件循环并且策略允许创建事件循环,它可能会创建一个新的事件循环对象。默认策略只会在主线程(由 threading.py 定义,它使用特殊子类表示主线程)中创建一个新的事件循环,并且只有在调用 get_event_loop() 之前调用 set_event_loop()。(要重置此状态,请重置策略。)在其他线程中,必须显式设置事件循环。其他策略可能会有不同的行为。默认策略创建的事件循环是惰性的;即对 get_event_loop() 的第一次调用将在必要时并根据当前策略创建事件循环实例。

为了方便单元测试和其他特殊情况,存在第三个策略函数:new_event_loop(),它根据策略的默认规则创建并返回一个新的事件循环对象。要将此事件循环设置为当前事件循环,必须使用它调用 set_event_loop()

要更改事件循环策略,请调用 set_event_loop_policy(policy),其中 policy 是一个事件循环策略对象或 None。如果不是 None,则策略对象必须是 AbstractEventLoopPolicy 的实例,该实例定义了方法 get_event_loop()set_event_loop(loop)new_event_loop(),所有这些方法的行为都与上面描述的函数相同。

传递 None 的策略值将恢复默认事件循环策略(覆盖平台或框架设置的备用默认值)。默认事件循环策略是类 DefaultEventLoopPolicy 的实例。可以通过调用 get_event_loop_policy() 来检索当前事件循环策略对象。

待定:描述子进程处理的子进程监视器和 UNIX quirks。

显式传递事件循环

可以编写使用事件循环而无需依赖全局或每个线程默认事件循环的代码。为此,所有需要访问当前事件循环(并且不是事件类上的方法)的 API 都带有一个名为 loop 的可选关键字参数。如果此参数为 None 或未指定,则此类 API 将调用 get_event_loop() 来获取默认事件循环,但如果 loop 关键字参数设置为一个事件循环对象,它们将使用该事件循环,并将它传递给它们调用的任何其他此类 API。例如,Future(loop=my_loop) 将创建一个与事件循环 my_loop 绑定的 Future。当默认当前事件为 None 时,loop 关键字参数实际上是强制性的。

注意,显式传递的事件循环仍然必须属于当前线程;loop 关键字参数不会神奇地改变事件循环使用方式的约束。

指定时间

像往常一样,在 Python 中,所有超时、间隔和延迟都以秒为单位,可以是整数或浮点数。但是,绝对时间不会指定为 POSIX 时间戳。时钟的精度、精确度和纪元由实现决定。

默认实现使用 time.monotonic()。关于此选择的影响可以写成书。最好阅读标准库 time 模块的文档。

嵌入式事件循环

在某些平台上,事件循环由系统提供。此类循环可能在用户代码启动时就已经运行,并且可能无法停止或关闭它,除非退出程序。在这种情况下,启动、停止和关闭事件循环的方法可能无法实现,并且 is_running() 可能始终返回 True

事件循环类

实际上没有名为 EventLoop 的类。存在一个 AbstractEventLoop 类,它定义了所有没有实现的方法,主要用作文档。定义了以下具体类

  • SelectorEventLoop 是基于 selectors 模块(Python 3.4 中新增)的完整 API 的具体实现。构造函数接受一个可选参数,即 selectors.Selector 对象。默认情况下,将创建一个 selectors.DefaultSelector 的实例并使用它。
  • ProactorEventLoop 是 API 的具体实现,但 I/O 事件处理和信号处理方法除外。它只在 Windows(或支持类似 API 以进行“重叠 I/O”的其他平台)上定义。构造函数接受一个可选参数,即 Proactor 对象。默认情况下,将创建一个 IocpProactor 的实例并使用它。(IocpProactor 类未在本 PEP 中指定;它只是 ProactorEventLoop 类的实现细节。)

事件循环方法概述

符合规范的事件循环的方法分为几类。第一组类别必须由所有符合规范的事件循环实现支持,但嵌入式事件循环可能不会实现启动、停止和关闭方法除外。(但是,部分符合规范的事件循环仍然比没有好。:-))

  • 启动、停止和关闭:run_forever()run_until_complete()stop()is_running()close()is_closed()
  • 基本和定时回调:call_soon()call_later()call_at()time()
  • 线程交互:call_soon_threadsafe()run_in_executor()set_default_executor()
  • Internet 名称查找:getaddrinfo()getnameinfo()
  • Internet 连接:create_connection()create_server()create_datagram_endpoint()
  • 包装的套接字方法:sock_recv()sock_sendall()sock_connect()sock_accept()
  • 任务和期货支持:create_future()create_task()set_task_factory()get_task_factory()
  • 错误处理:get_exception_handler()set_exception_handler()default_exception_handler()call_exception_handler()
  • 调试模式:get_debug()set_debug()

第二组类别可能得到符合的事件循环实现的支持。如果不受支持,它们将引发 NotImplementedError。(在默认实现中,UNIX 系统上的 SelectorEventLoop 支持所有这些;Windows 上的 SelectorEventLoop 支持 I/O 事件处理类别;Windows 上的 ProactorEventLoop 支持管道和子进程类别。)

  • I/O 回调:add_reader()remove_reader()add_writer()remove_writer()
  • 管道和子进程:connect_read_pipe()connect_write_pipe()subprocess_shell()subprocess_exec()
  • 信号回调:add_signal_handler()remove_signal_handler()

事件循环方法

启动、停止和关闭

一个(未关闭的)事件循环可以处于两种状态之一:运行或停止。这些方法用于启动和停止事件循环。

  • run_forever()。运行事件循环,直到调用 stop()。当事件循环已在运行时,不能调用此方法。(这在一定程度上拥有一个较长的名称,以避免与该 PEP 的早期版本产生混淆,在早期版本中,run() 的行为有所不同,部分原因是已经有太多 API 拥有名为 run() 的方法,部分原因是调用此方法的地方不应该太多。)
  • run_until_complete(future)。运行事件循环,直到期货完成。如果期货完成,则返回其结果,或引发其异常。当事件循环已在运行时,不能调用此方法。该方法如果参数是协程,则会创建一个新的 Task 对象。
  • stop()。在方便时停止事件循环。随后使用 run_forever()run_until_complete() 重新启动循环是完全可以的;如果这样做,不会丢失任何已安排的回调。注意:stop() 会正常返回,并且当前回调可以继续执行。此后事件循环停止的时间取决于具体实现,但目的是停止在轮询 I/O 之前,并且不运行将来安排的任何回调;具体实现的主要自由度在于它在停止之前处理了多少“就绪队列”(使用 call_soon() 已安排的回调)。
  • is_running()。如果事件循环当前正在运行,则返回 True,如果已停止,则返回 False
  • close()。关闭事件循环,释放它可能持有的任何资源,例如 epoll()kqueue() 使用的文件描述符,以及默认执行器。在事件循环正在运行时不应调用此方法。调用之后,不应再次使用事件循环。可以多次调用它;后续调用是无操作的。
  • is_closed()。如果事件循环已关闭,则返回 True,否则返回 False。(主要用于错误报告;请不要基于此方法实现功能。)

基本回调

与同一个事件循环关联的回调是严格串行化的:一个回调必须完成才能调用下一个回调。这是一个重要的保证:当两个或多个回调使用或修改共享状态时,每个回调都保证在它运行时,共享状态不会被另一个回调更改。

  • call_soon(callback, *args)。这会安排一个回调,以便尽快调用它。返回一个表示回调的 Handle(见下文),其 cancel() 方法可用于取消回调。它保证回调按其安排顺序调用。
  • call_later(delay, callback, *args)。安排在将来大约 delay 秒后调用 callback(*args) 一次,除非取消。返回一个表示回调的 Handle,其 cancel() 方法可用于取消回调。在过去或在完全相同的时间安排的回调将以未定义的顺序调用。
  • call_at(when, callback, *args)。这类似于 call_later(),但时间表示为绝对时间。返回一个类似的 Handle。存在一个简单的等效性:loop.call_later(delay, callback, *args) 等同于 loop.call_at(loop.time() + delay, callback, *args)
  • time()。返回事件循环时钟的当前时间。这可能是 time.time()time.monotonic() 或其他系统特定的时钟,但它必须返回一个浮点数,以表示自某个纪元以来的时间,单位约为一秒。(没有时钟是完美的 - 请参阅 PEP 418。)

注意:该 PEP 的先前版本定义了一个名为 call_repeatedly() 的方法,该方法承诺定期调用回调。此方法已撤回,因为此类函数的设计过度指定。一方面,一个简单的计时器循环可以轻松地使用一个回调来模拟,该回调使用 call_later() 重新安排自身;也可以轻松地编写包含一个循环和一个 sleep() 调用的协程(模块中的顶级函数,见下文)。另一方面,由于精确计时机制的复杂性,这里有许多陷阱和缺陷,不了解的人很容易陷入其中(请参阅 PEP 418),并且不同的用例在边缘情况下需要不同的行为。不可能提供一个 API 来满足这种情况下的所有需求,因此,最好让应用程序设计人员自行决定要实现哪种计时器循环。

线程交互

  • call_soon_threadsafe(callback, *args)。类似于 call_soon(callback, *args),但当从另一个线程调用时,而事件循环正在阻塞以等待 I/O,会取消阻塞事件循环。返回一个 Handle。这是唯一一个可以从另一个线程安全调用的方法。(要在线程安全的方式中为将来安排一个回调,可以使用 loop.call_soon_threadsafe(loop.call_later, when, callback, *args)。)注意:这在信号处理程序中不安全(因为它可能使用锁)。实际上,没有 API 是信号安全的;如果要处理信号,请使用下面描述的 add_signal_handler()
  • run_in_executor(executor, callback, *args)。安排在执行器中调用 callback(*args)(请参阅 PEP 3148)。返回一个 asyncio.Future 实例,其成功时的结果是该调用的返回值。这等效于 wrap_future(executor.submit(callback, *args))。如果 executorNone,则使用由 set_default_executor() 设置的默认执行器。如果没有设置默认执行器,则创建一个具有默认线程数量的 ThreadPoolExecutor 并将其设置为默认执行器。(默认实现在此情况下使用 5 个线程。)
  • set_default_executor(executor)。设置 run_in_executor() 使用的默认执行器。该参数必须是一个 PEP 3148 Executor 实例或 None,以便重置默认执行器。

另请参阅期货部分中描述的 wrap_future() 函数。

互联网名称查找

如果您想连接或绑定套接字到地址,而不会有因名称查找而阻塞的风险,这些方法很有用。它们通常由 create_connection()create_server()create_datagram_endpoint() 隐式调用。

  • getaddrinfo(host, port, family=0, type=0, proto=0, flags=0)。类似于 socket.getaddrinfo() 函数,但返回一个 Future。Future 成功后的结果将是一个与 socket.getaddrinfo() 返回的格式相同的列表,即 (address_family, socket_type, socket_protocol, canonical_name, address) 的列表,其中 address 是一个 2 元组 (ipv4_address, port)(对于 IPv4 地址)或一个 4 元组 (ipv6_address, port, flow_info, scope_id)(对于 IPv6 地址)。如果 family 参数为零或未指定,则返回的列表可能包含 IPv4 和 IPv6 地址的混合;否则,返回的地址将受 family 值限制(protoflags 也是如此)。默认实现使用 run_in_executor() 调用 socket.getaddrinfo(),但其他实现可以选择实现自己的 DNS 查找。可选参数**必须**以关键字参数的形式指定。

    注意:实现允许实现完整 socket.getaddrinfo() 接口的一个子集;例如,它们可能不支持符号端口名,或者可能忽略或不完全实现 typeprotoflags 参数。但是,如果忽略了 typeproto,则传入的参数值应原封不动地复制到返回元组的 socket_typesocket_protocol 元素中。(您不能忽略 family,因为 IPv4 和 IPv6 地址必须以不同的方式查找。 family 的唯一允许值是 socket.AF_UNSPEC (0)、socket.AF_INETsocket.AF_INET6,后者只有在平台定义的情况下才允许)。

  • getnameinfo(sockaddr, flags=0)。类似于 socket.getnameinfo(),但返回一个 Future。Future 成功后的结果将是一个元组 (host, port)。与 getaddrinfo() 相同的实现说明。

互联网连接

这些是用于管理互联网连接的高级接口。建议使用它们,而不是使用相应的低级接口,因为它们抽象了基于选择器和基于预处理器的事件循环之间的差异。

请注意,流连接的客户端和服务器端使用相同的传输和协议接口。但是,数据报端点使用不同的传输和协议接口。

  • create_connection(protocol_factory, host, port, <options>)。建立到给定互联网主机和端口的流连接。这通常是连接客户端调用的任务。它创建一个实现相关的双向流 Transport 来表示连接,然后调用 protocol_factory() 来实例化(或检索)用户的 Protocol 实现,最后将两者绑定在一起。(有关 Transport 和 Protocol 的定义,请参见下文)。用户的 Protocol 实现是通过调用 protocol_factory()(不带参数*)创建或检索的。协程成功后的结果是 (transport, protocol) 对;如果失败阻止创建成功的连接,则会引发适当的异常。请注意,当协程完成时,协议的 connection_made() 方法尚未被调用;这将在连接握手完成时发生。

    (*) 没有要求 protocol_factory 是一个类。如果您的协议类需要将特定参数传递给其构造函数,可以使用 lambda。您还可以传递一个简单的 lambda,它返回一个之前构建的 Protocol 实例。

    <options> 全部使用可选关键字参数指定

    • ssl:传递 True 以创建一个 SSL/TLS 传输(默认情况下创建一个纯 TCP 传输)。或者传递一个 ssl.SSLContext 对象来覆盖要使用的默认 SSL 上下文对象。如果创建了默认上下文,则由实现配置合理的默认值。参考实现目前使用 PROTOCOL_SSLv23 并设置 OP_NO_SSLv2 选项,调用 set_default_verify_paths() 并将 verify_mode 设置为 CERT_REQUIRED。此外,无论何时上下文(默认上下文或其他上下文)指定 verify_modeCERT_REQUIREDCERT_OPTIONAL,如果给出了主机名,则在成功握手后立即调用 ssl.match_hostname(peercert, hostname),如果这引发了异常,则连接将关闭。(要避免这种行为,传入一个 verify_mode 设置为 CERT_NONE 的 SSL 上下文。但这意味着您不安全,并且容易受到例如中间人攻击)。
    • familyprotoflags:要传递给 getaddrinfo() 的地址族、协议和标志。这些都默认为 0,这意味着“未指定”。(套接字类型始终为 SOCK_STREAM)。如果未指定这些值中的任何一个,getaddrinfo() 方法将选择合适的值。注意:proto 与高级 Protocol 概念或 protocol_factory 参数无关。
    • sock:一个可选的套接字,用于代替使用 hostportfamilyprotoflags 参数。如果给出了这个参数,hostport 必须显式地设置为 None
    • local_addr:如果给定,则为 (host, port) 元组,用于将套接字本地绑定。这很少需要,但在多宿主服务器上,您有时需要强制连接来自特定地址。这就是您如何做到这一点。使用 getaddrinfo() 查找主机和端口。
    • server_hostname:这只有在使用 SSL/TLS 时才相关;如果未设置 ssl,则不应使用它。当设置了 ssl 时,这将设置或覆盖将被验证的主机名。默认情况下使用 host 参数的值。如果 host 为空,则没有默认值,您必须为 server_hostname 传递一个值。要禁用主机名验证(这是一个严重的安全性风险),您必须在此处传递一个空字符串,并将 verify_mode 设置为 ssl.CERT_NONEssl.SSLContext 对象作为 ssl 参数传递。
  • create_server(protocol_factory, host, port, <options>)。进入一个接受连接的服务循环。这是一个协程,它在服务循环设置为服务后完成。返回值是一个 Server 对象,可用于以受控方式停止服务循环(请参见下文)。如果指定的地址允许 IPv4 和 IPv6 连接,则可能会绑定多个套接字。

    每次接受连接时,都会调用 protocol_factory(不带参数**)来创建一个 Protocol,创建一个双向流 Transport 来表示连接的网络端,并将两者绑定在一起,方法是调用 protocol.connection_made(transport)

    (**) 请参阅 create_connection() 的先前脚注。但是,由于 protocol_factory() 每次为每个新的传入连接调用一次,因此它应该每次调用时返回一个新的 Protocol 对象。

    <options> 全部使用可选关键字参数指定

    • ssl:传递一个 ssl.SSLContext 对象(或具有相同接口的对象)来覆盖要使用的默认 SSL 上下文对象。(与 create_connection() 不同,在此传递 True 毫无意义 - 需要 SSLContext 对象来指定证书和密钥)。
    • backlog: 传递给 listen() 调用的待处理连接数。默认值取决于实现;在默认实现中,默认值为 100
    • reuse_address: 是否在套接字上设置 SO_REUSEADDR 选项。默认情况下,UNIX 上为 True,Windows 上为 False
    • family, flags: 传递给 getaddrinfo() 的地址族和标志。地址族默认值为 AF_UNSPEC;标志默认值为 AI_PASSIVE。(套接字类型始终为 SOCK_STREAM;套接字协议始终设置为 0,以便让 getaddrinfo() 选择。)
    • sock: 一个可选的套接字,用于代替使用 hostportfamilyflags 参数。如果提供了这个参数,则必须将 hostport 显式设置为 None
  • create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, <options>). 创建一个用于发送和接收数据报(通常是 UDP 数据包)的端点。由于数据报流量的性质,没有单独的调用来设置客户端和服务器端,因为通常单个端点充当客户端和服务器。这是一个协程,在成功时返回一个 (transport, protocol) 对,或在失败时引发异常。如果协程成功返回,则传输将在每次收到数据报或套接字关闭时在协议上调用回调;协议负责调用协议上的方法来发送数据报。返回的传输是一个 DatagramTransport。返回的协议是一个 DatagramProtocol。这些将在后面介绍。

    必须的定位参数

    • protocol_factory: 一个类或工厂函数,将在没有参数的情况下恰好调用一次,以构建要返回的协议对象。数据报传输和协议之间的接口将在下面介绍。

    可以按位置或关键字参数指定的选择参数

    • local_addr: 一个可选的元组,指示套接字将绑定的地址。如果给出,则必须是一个 (host, port) 对。它将被传递给 getaddrinfo() 以进行解析,解析结果将被传递给创建的套接字的 bind() 方法。如果 getaddrinfo() 返回多个地址,它们将依次尝试。如果省略,将不会进行任何 bind() 调用。
    • remote_addr: 一个可选的元组,指示套接字将“连接”到的地址。(由于没有数据报连接这样的东西,因此这只是指定了传出数据报目的地址的默认值。)如果给出,则必须是一个 (host, port) 对。它将被传递给 getaddrinfo() 以进行解析,解析结果将与创建的套接字一起被传递给 sock_connect()。如果 getaddrinfo() 返回多个地址,它们将依次尝试。如果省略,将不会进行任何 sock_connect() 调用。

    <options> 全部使用可选关键字参数指定

    • family, proto, flags: 传递给 getaddrinfo() 的地址族、协议和标志。它们都默认为 0,这意味着“未指定”。(套接字类型始终为 SOCK_DGRAM。)如果未指定这些值中的任何一个,则 getaddrinfo() 方法将选择适当的值。

    请注意,如果 local_addrremote_addr 都存在,则将尝试所有具有匹配地址族的本地和远程地址组合。

包装套接字方法

以下用于对套接字进行异步 I/O 的方法并非用于一般用途。它们主要用于通过 ProactorEventLoop 类与 IOCP 配合工作的传输实现。但是,它们很容易为其他事件循环类型实现,因此没有理由不强制要求它们。套接字参数必须是非阻塞套接字。

  • sock_recv(sock, n). 从套接字 sock 接收最多 n 字节。返回一个 Future,其成功时的结果将是一个字节对象。
  • sock_sendall(sock, data). 将字节 data 发送到套接字 sock。返回一个 Future,其成功时的结果将是 None。注意:名称使用 sendall 而不是 send,以反映该方法的语义和签名与标准库套接字方法 sendall() 而不是 send() 相呼应。
  • sock_connect(sock, address). 连接到给定的地址。返回一个 Future,其成功时的结果将是 None
  • sock_accept(sock). 接受来自套接字的连接。套接字必须处于监听模式并绑定到地址。返回一个 Future,其成功时的结果将是一个元组 (conn, peer),其中 conn 是一个已连接的非阻塞套接字,而 peer 是对等方地址。

I/O 回调

这些方法主要用于与选择器配合工作的传输实现。它们由 SelectorEventLoop 实现,但未由 ProactorEventLoop 实现。自定义事件循环实现可能实现它们,也可能不实现它们。

下面的 fd 参数可以是整型文件描述符,也可以是具有 fileno() 方法的“文件类”对象,这些对象包装了整型文件描述符。并非所有文件类对象或文件描述符都是可接受的。套接字(和套接字文件描述符)始终被接受。在 Windows 上,不支持其他类型。在 UNIX 上,还支持管道和可能支持 tty 设备,但不支持磁盘文件。支持的特殊文件类型可能因平台和选择器实现而异。(根据经验,OS X 上至少有一种伪 tty 类型受 selectpoll 支持,但不支持 kqueue:它由 Emacs shell 窗口使用。)

  • add_reader(fd, callback, *args). 安排在文件描述符 fd 被认为准备就绪以进行读取时调用 callback(*args)。对同一个文件描述符再次调用 add_reader() 意味着对同一个文件描述符调用 remove_reader()
  • add_writer(fd, callback, *args). 与 add_reader() 相似,但为写入而不是读取注册回调。
  • remove_reader(fd). 取消文件描述符 fd 的当前读取回调(如果有)。如果当前未为文件描述符设置回调,则这是一个空操作,并将返回 False。否则,它将删除回调安排并返回 True
  • remove_writer(fd). 这与 add_writer() 的关系类似于 remove_reader()add_reader() 的关系。

管道和子进程

这些方法受 UNIX 上的 SelectorEventLoop 和 Windows 上的 ProactorEventLoop 支持。

与管道和子进程一起使用的传输和协议不同于与常规流连接一起使用的传输和协议。这些将在后面介绍。

下面的每个方法都有一个 protocol_factory 参数,类似于 create_connection();它将在没有参数的情况下恰好调用一次,以构建要返回的协议对象。

每个方法都是一个协程,在成功时返回一个 (transport, protocol) 对,或在失败时引发异常。

  • connect_read_pipe(protocol_factory, pipe): 从一个包装了 UNIX 管道读取端的“文件类”对象创建一个单向流连接,该管道必须处于非阻塞模式。返回的传输是一个 ReadTransport
  • connect_write_pipe(protocol_factory, pipe): 从包装 UNIX 管道写入端的类文件对象创建一个单向流连接,该管道必须处于非阻塞模式。返回的传输是 WriteTransport;它没有任何与读取相关的 方法。返回的协议是 BaseProtocol
  • subprocess_shell(protocol_factory, cmd, <options>): 从 cmd 创建一个子进程,这是一个使用平台的“shell”语法的字符串。这类似于标准库 subprocess.Popen() 类,它使用 shell=True 调用。其余参数和返回值将在下面描述。
  • subprocess_exec(protocol_factory, *args, <options>): 从一个或多个字符串参数创建一个子进程,其中第一个字符串指定要执行的程序,其余字符串指定程序的参数。 (因此,假设它是一个 Python 脚本,这些字符串参数共同构成了程序的 sys.argv 值。)这类似于标准库 subprocess.Popen() 类,它使用 shell=False 调用并使用字符串列表作为第一个参数;但是,Popen() 接受一个字符串列表作为单个参数,而 subprocess_exec() 接受多个字符串参数。其余参数和返回值将在下面描述。

除了指定要执行的程序的方式之外,两个 subprocess_*() 方法的行为相同。返回的传输是 SubprocessTransport,它具有与常见的双向流传输不同的接口。返回的协议是 SubprocessProtocol,它也具有自定义接口。

<options> 全部使用可选关键字参数指定

  • stdin: 或者是一个类文件对象,代表要使用 connect_write_pipe() 连接到子进程的标准输入流的管道,或者是一个常量 subprocess.PIPE(默认值)。默认情况下,将创建一个新的管道并连接。
  • stdout: 或者是一个类文件对象,代表要使用 connect_read_pipe() 连接到子进程的标准输出流的管道,或者是一个常量 subprocess.PIPE(默认值)。默认情况下,将创建一个新的管道并连接。
  • stderr: 或者是一个类文件对象,代表要使用 connect_read_pipe() 连接到子进程的标准错误流的管道,或者是一个常量 subprocess.PIPE(默认值)或 subprocess.STDOUT。默认情况下,将创建一个新的管道并连接。当指定 subprocess.STDOUT 时,子进程的标准错误流将连接到与标准输出流相同的管道。
  • bufsize: 创建管道时要使用的缓冲区大小;这将传递给 subprocess.Popen()。在默认实现中,这默认为零,在 Windows 上它必须为零;这些默认值与 subprocess.Popen() 不同。
  • executable, preexec_fn, close_fds, cwd, env, startupinfo, creationflags, restore_signals, start_new_session, pass_fds: 这些可选参数将传递给 subprocess.Popen() 而不进行解释。

信号回调

这些方法仅在 UNIX 上受支持。

  • add_signal_handler(sig, callback, *args). 每当收到信号 sig 时,安排调用 callback(*args)。为同一信号指定另一个回调将替换先前的处理程序(每个信号只能有一个处理程序处于活动状态)。sig 必须是在 signal 模块中定义的有效信号编号。如果无法处理信号,则会引发异常:如果它不是有效信号或它是无法捕获的信号(例如 SIGKILL),则引发 ValueError,如果此特定事件循环实例无法处理信号,则引发 RuntimeError(因为信号在每个进程中是全局的,所以只有与主线程关联的事件循环才能处理信号)。
  • remove_signal_handler(sig). 删除信号 sig 的处理程序(如果已设置)。引发与 add_signal_handler() 相同的异常(除了它可能会返回 False 而不是为无法捕获的信号引发 RuntimeError)。如果成功删除了处理程序,则返回 True,如果未设置处理程序,则返回 False

注意:如果静态地知道这些方法不受支持,它们可能会引发 NotImplementedError 而不是 RuntimeError

回调的互斥

事件循环应该强制执行回调的互斥,即它永远不应该在先前回调仍在运行时启动回调。这应该适用于所有类型的回调,无论它们是使用 call_soon()call_later()call_at()call_soon_threadsafe()add_reader()add_writer()add_signal_handler() 调度的。

异常

Python 中有两种类型的异常:从 Exception 类派生的异常和从 BaseException 类派生的异常。从 Exception 派生的异常通常会被捕获并适当地处理;例如,它们将被 Futures 传递,并且当它们在回调中发生时,它们将被记录并忽略。

但是,仅从 BaseException 派生的异常通常不会被捕获,并且通常会导致程序以跟踪记录终止。在某些情况下,它们会被捕获并重新引发。(此类别的示例包括 KeyboardInterruptSystemExit;通常明智的做法是不将它们与大多数其他异常相同对待。)

事件循环将后一类传递到其异常处理程序中。这是一个回调,它接受上下文字典作为参数

def exception_handler(context):
    ...

上下文可能具有许多不同的键,但其中一些被广泛使用

  • 'message': 错误消息。
  • 'exception': 异常实例;如果不存在异常,则为 None
  • 'source_traceback': 一个字符串列表,表示创建与错误相关的对象的点的堆栈。
  • 'handle_traceback': 一个字符串列表,表示创建与错误相关的句柄的时刻的堆栈。

循环具有以下与异常处理相关的 方法

  • get_exception_handler() 返回为循环注册的当前异常处理程序。
  • set_exception_handler(handler) 设置异常处理程序。
  • default_exception_handler(context) 此循环实现的默认异常处理程序。
  • call_exception_handler(context)上下文传递给已注册的异常处理程序。这允许第三方库统一处理未捕获的异常。

    如果默认值未被显式 set_exception_handler() 调用覆盖,则循环使用 default_exception_handler()

调试模式

默认情况下,循环在发布模式下运行。应用程序可以通过启用调试模式以更好的错误报告为代价来提高性能。

在调试模式下,将启用许多其他检查,例如

  • 源跟踪记录可用于期货/任务中的未处理异常。
  • 循环检查缓慢的回调以检测意外的 I/O 阻塞。

    loop.slow_callback_duration 属性控制在两个yield 点之间允许的最大执行时间,在报告缓慢的回调之前。默认值为 0.1 秒;可以通过分配给它来更改它。

有两个与调试模式相关的 方法

  • get_debug() 如果启用了调试模式,则返回 True,否则返回 False
  • set_debug(enabled) 如果参数为 True,则启用调试模式。

如果定义了 PYTHONASYNCIODEBUG 环境变量并且它不为空,则会自动启用调试模式。

句柄

注册一次性回调的各种方法(call_soon()call_later()call_at()call_soon_threadsafe())都返回一个表示注册的对象,该对象可用于取消回调。此对象称为 Handle。Handles 是不透明的,只有一个公共方法

  • cancel():取消回调。

请注意,add_reader()add_writer()add_signal_handler() 不会返回 Handles。

服务器

create_server() 方法返回一个 Server 实例,该实例包装了用于接受请求的套接字(或其他网络对象)。此类有两个公共方法

  • close():关闭服务。这将停止接受新请求,但不会取消已接受并正在处理的请求。
  • wait_closed():一个协程,它阻塞直到服务关闭且所有已接受的请求都已处理。

期货

这里的 asyncio.Future 类有意与 PEP 3148 中指定的 concurrent.futures.Future 类类似,但有一些细微的差别。无论何时本 PEP 谈论 Futures 或 futures,都应理解为是指 asyncio.Future,除非明确提及 concurrent.futures.Future。支持的公共 API 如下所示,指出了与 PEP 3148 的区别

  • cancel()。如果 Future 已经完成(或已取消),则不做任何操作并返回 False。否则,这将尝试取消 Future 并返回 True。如果取消尝试成功,最终 Future 的状态将变为取消(以便 cancelled() 将返回 True),并且回调将被调度。对于常规 Futures,取消将始终立即成功;但对于 Tasks(见下文),任务可能会忽略或延迟取消尝试。
  • cancelled()。如果 Future 被成功取消,则返回 True
  • done()。如果 Future 完成,则返回 True。请注意,已取消的 Future 也被认为已完成(在此处和所有地方)。
  • result()。返回使用 set_result() 设置的结果集,或引发使用 set_exception() 设置的异常。如果取消,则引发 CancelledError。与 PEP 3148 的区别:这没有超时参数并且 *不* 等待;如果 future 尚未完成,它将引发异常。
  • exception()。如果使用 set_exception() 设置了异常,则返回该异常,或者如果使用 set_result() 设置了结果,则返回 None。如果取消,则引发 CancelledError。与 PEP 3148 的区别:这没有超时参数并且 *不* 等待;如果 future 尚未完成,它将引发异常。
  • add_done_callback(fn)。添加一个回调,在 Future 完成(或被取消)时运行。如果 Future 已经完成(或已取消),则使用 call_soon() 调度回调。与 PEP 3148 的区别:回调永远不会立即调用,并且始终在调用者的上下文中调用 - 通常这是线程。您可以将其视为通过 call_soon() 调用回调。请注意,为了与 PEP 3148 匹配,回调(与本 PEP 中定义的所有其他回调不同,并忽略以下“回调样式”部分中的约定)始终使用单个参数(Future 对象)调用。(严格序列化使用 call_soon() 调度的回调的动机也适用于此。)
  • remove_done_callback(fn)。从回调列表中删除参数。此方法未由 PEP 3148 定义。该参数必须等于(使用 ==)传递给 add_done_callback() 的参数。返回回调被移除的次数。
  • set_result(result)。Future 必须尚未完成(也未取消)。这使 Future 完成并调度回调。与 PEP 3148 的区别:这是一个公共 API。
  • set_exception(exception)。Future 必须尚未完成(也未取消)。这使 Future 完成并调度回调。与 PEP 3148 的区别:这是一个公共 API。

内部方法 set_running_or_notify_cancel() 不受支持;无法设置运行状态。同样,方法 running() 也不受支持。

定义了以下异常

  • InvalidStateError。每当 Future 不处于对正在调用的方法可接受的状态时引发(例如,对已经完成的 Future 调用 set_result(),或者对尚未完成的 Future 调用 result())。
  • InvalidTimeoutError。当给出非零 timeout 参数时,由 result()exception() 引发。
  • CancelledErrorconcurrent.futures.CancelledError 的别名。当对已取消的 Future 调用 result()exception() 时引发。
  • TimeoutErrorconcurrent.futures.TimeoutError 的别名。可能会由 run_until_complete() 引发。

Future 在创建时与事件循环相关联。

asyncio.Future 对象对于 concurrent.futures 包中的 wait()as_completed() 函数不可接受。但是,存在类似的 API asyncio.wait()asyncio.as_completed(),如下所述。

asyncio.Future 对象在协程中使用时对 yield from 表达式是可接受的。这是通过 Future 上的 __iter__() 接口实现的。请参见以下“协程和调度程序”部分。

当 Future 被垃圾回收时,如果它具有关联的异常,但从未调用过 result()exception(),则记录异常。(当协程使用 yield from 等待 Future 时,协程恢复后会调用该 Future 的 result() 方法。)

将来(双关语),我们可能会统一 asyncio.Futureconcurrent.futures.Future,例如,通过向后者添加一个 __iter__() 方法,该方法适用于 yield from。为了防止在尚未完成的 Future 上调用例如 result() 时意外阻塞事件循环,阻塞操作可能会检测到当前线程中是否有活动事件循环,并改为引发异常。但是,当前 PEP 努力不依赖于 Python 3.3 之外的任何内容,因此目前无法更改 concurrent.futures.Future

有一些与 Futures 相关的公共函数

  • asyncio.async(arg)。这将接收一个参数,该参数是协程对象或 Future(即,任何可以使用 yield from 的内容),并返回一个 Future。如果参数是 Future,则返回不变;如果它是协程对象,则将其包装在 Task 中(请记住,TaskFuture 的子类)。
  • asyncio.wrap_future(future)。这将接收一个 PEP 3148 Future(即,concurrent.futures.Future 的实例),并返回一个与事件循环兼容的 Future(即,asyncio.Future 实例)。

传输

传输和协议受 Twisted 和 PEP 3153 的强烈影响。用户很少实现或实例化传输 - 相反,事件循环提供了设置传输的实用程序方法。

传输与协议协同工作。协议通常在不知道或不关心所用传输的具体类型的情况下编写,并且传输可以与各种协议一起使用。例如,HTTP 客户端协议实现可以使用普通套接字传输或 SSL/TLS 传输。普通套接字传输除了 HTTP 外,还可以与许多不同的协议一起使用(例如 SMTP、IMAP、POP、FTP、IRC、SPDY)。

最常见的传输类型是双向流传输。还存在单向流传输(用于管道)和数据报传输(由 create_datagram_endpoint() 方法使用)。

所有传输的方法

  • get_extra_info(name, default=None)。这是一个万能方法,它返回有关传输的特定于实现的信息。第一个参数是要检索的额外字段的名称。可选的第二个参数是要返回的默认值。请参阅实现文档以了解支持的额外字段名称。对于不受支持的名称,始终返回默认值。

双向流传输

双向流传输是在套接字或类似事物(例如一对 UNIX 管道或 SSL/TLS 连接)之上的抽象。

大多数连接具有非对称性质:客户端和服务器通常具有截然不同的角色和行为。因此,传输和协议之间的接口也是非对称的。从协议的角度来看,写入数据是通过在传输对象上调用 write() 方法完成的;这将缓冲数据并立即返回。但是,传输在读取数据方面扮演更积极的角色:每当从套接字(或其他数据源)读取一些数据时,传输都会调用协议的 data_received() 方法。

尽管如此,双向流使用的传输和协议之间的接口对于客户端和服务器来说是相同的,因为客户端和服务器之间的连接本质上是一对流,每个方向一个。

双向流传输具有以下公共方法

  • write(data)。写入一些字节。参数必须是字节对象。返回 None。传输可以自由地缓冲字节,但它最终必须将字节传输到另一端,并且必须保持流行为。也就是说,t.write(b'abc'); t.write(b'def') 等效于 t.write(b'abcdef'),以及
    t.write(b'a')
    t.write(b'b')
    t.write(b'c')
    t.write(b'd')
    t.write(b'e')
    t.write(b'f')
    
  • writelines(iterable)。等效于
    for data in iterable:
        self.write(data)
    
  • write_eof()。关闭连接的写入端。不允许对 write() 的后续调用。一旦所有缓冲的数据被传输,传输就会向另一端发出信号,表示不会再接收更多数据。某些协议不支持此操作;在这种情况下,调用 write_eof() 将引发异常。(注意:这曾经被称为 half_close(),但除非你已经知道它的用途,否则这个名称并不能说明一端是关闭的。)
  • can_write_eof()。如果协议支持 write_eof(),则返回 True,否则返回 False。(此方法通常返回一个固定值,该值仅取决于特定的 Transport 类,而不是 Transport 对象的状态。它之所以需要,是因为某些协议在 write_eof() 不可使用时需要改变其行为。例如,在 HTTP 中,为了发送大小未知的数据,数据结束通常使用 write_eof() 来指示;但是,SSL/TLS 不支持这一点,HTTP 协议实现在这种情况下必须使用“分块”传输编码。但是,如果数据大小事先已知,则在两种情况下最佳方法都是使用 Content-Length 标头。)
  • get_write_buffer_size()。返回传输写入缓冲区的当前大小(以字节为单位)。这只知道传输显式管理的写入缓冲区;不会报告网络栈或其他网络位置的其他层中的缓冲。
  • set_write_buffer_limits(high=None, low=None)。设置流量控制的高水位和低水位限制。

    这两个值控制何时调用协议的 pause_writing()resume_writing() 方法。如果指定,低水位限制必须小于或等于高水位限制。两个值都不能为负数。

    默认值是特定于实现的。如果只给出高水位限制,则低水位限制默认为特定于实现的值,该值小于或等于高水位限制。将 high 设置为零会强制 low 也设置为零,并导致每当缓冲区变为非空时调用 pause_writing()。将 low 设置为零会导致仅在缓冲区为空时调用 resume_writing()。对任一限制使用零通常不是最佳的,因为它减少了同时进行 I/O 和计算的机会。

  • pause_reading()。暂停将数据传递到协议,直到后续的 resume_reading() 调用。在 pause_reading()resume_reading() 之间,不会调用协议的 data_received() 方法。
  • resume_reading()。通过 data_received() 重新启动将数据传递到协议。请注意,“暂停”是一个二进制状态——只有在传输未暂停时才应调用 pause_reading(),而只有在传输暂停时才应调用 resume_reading()
  • close()。断开与另一端的实体的连接。所有由 write() 缓冲的数据将(最终)在连接实际关闭之前被传输。不会再调用协议的 data_received() 方法。一旦所有缓冲的数据都被刷新,协议的 connection_lost() 方法将使用 None 作为参数被调用。请注意,此方法不会等待所有这些操作完成。
  • abort()。立即断开连接。任何仍在传输中缓冲的数据都会被丢弃。很快,协议的 connection_lost() 方法将使用 None 作为参数被调用。

单向流传输

写入流传输支持针对双向流传输描述的 write()writelines()write_eof()can_write_eof()close()abort() 方法。

读取流传输支持针对双向流传输描述的 pause_reading()resume_reading()close() 方法。

写入流传输仅在关联的协议上调用 connection_made()connection_lost()

读取流传输可以调用下面协议部分中指定的协议方法(即前两个加上 data_received()eof_received())。

数据报传输

数据报传输具有以下方法

  • sendto(data, addr=None)。发送数据报(字节对象)。可选的第二个参数是目标地址。如果省略,则必须在创建此传输的 create_datagram_endpoint() 调用中指定 remote_addr。如果存在并且指定了 remote_addr,则它们必须匹配。(data,addr)对可以立即发送或缓冲。返回值是 None
  • abort()。立即关闭传输。缓冲的数据将被丢弃。
  • close()。关闭传输。缓冲的数据将异步传输。

数据报传输在关联的协议对象上调用以下方法:connection_made()connection_lost()error_received()datagram_received()。(这些方法名称中的“连接”是一个轻微的误称,但这些概念仍然存在:connection_made() 表示表示端点的传输已创建,而 connection_lost() 表示传输已关闭。)

子进程传输

子进程传输具有以下方法

  • get_pid()。返回子进程的进程 ID。
  • get_returncode()。如果进程已退出,则返回进程退出代码;否则返回 None
  • get_pipe_transport(fd)。返回与参数相对应的管道传输(单向流传输),参数应为 0、1 或 2,分别代表 stdin、stdout 或 stderr(子进程)。如果没有这样的管道传输,则返回 None。对于 stdin,这是一个写入传输;对于 stdout 和 stderr,这是一个读取传输。你必须使用此方法来获取可以用来写入子进程 stdin 的传输。
  • send_signal(signal)。向子进程发送信号。
  • terminate()。终止子进程。
  • kill()。杀死子进程。在 Windows 上,这是 terminate() 的别名。
  • close()。这是 terminate() 的别名。

请注意,send_signal()terminate()kill() 封装了标准库 subprocess 模块中的相应方法。

协议

协议始终与传输一起使用。虽然提供了一些常见的协议(例如,相当不错但未必优秀的 HTTP 客户端和服务器实现),但大多数协议将由用户代码或第三方库实现。

与传输类似,我们区分流协议、数据报协议,以及其他自定义协议。最常见的协议类型是双向流协议。(没有单向协议。)

流协议

(双向)流协议必须实现以下方法,这些方法将由传输调用。可以将它们视为始终由事件循环在正确上下文中调用的回调。(参见上面的“上下文”部分。)

  • connection_made(transport)。表示传输已准备好并已连接到另一端的实体。协议应该将传输引用保存为实例变量(以便以后可以调用其 write() 和其他方法),并且可能在此时写入初始问候或请求。
  • data_received(data)。传输已从连接中读取了一些字节。参数始终是一个非空字节对象。对于以这种方式传递的数据的最小或最大大小没有保证。p.data_received(b'abcdef') 应该被视为与
    p.data_received(b'abc')
    p.data_received(b'def')
    
  • eof_received()。当另一端调用 write_eof()(或类似的东西)时调用。如果它返回一个假值(包括 None),传输将关闭自身。如果它返回一个真值,则由协议负责关闭传输。但是,对于 SSL/TLS 连接,这将被忽略,因为 TLS 标准要求在接收到“关闭警报”后,不再发送任何数据,并且连接将关闭。

    默认实现返回 None

  • pause_writing()。要求协议暂时停止向传输写入数据。是否理会该请求是可选的,但如果您继续写入,传输的缓冲区可能会无限增长。通过传输的 set_write_buffer_limits() 方法可以控制调用此方法时的缓冲区大小。
  • resume_writing()。告诉协议可以安全地开始再次向传输写入数据。请注意,这可能由传输的 write() 方法直接调用(而不是通过 call_soon() 间接调用),以便协议在 write() 返回后立即意识到其暂停状态。
  • connection_lost(exc)。传输已关闭或中止,已检测到另一端已干净地关闭连接,或已遇到意外错误。在前三种情况下,参数为 None;对于意外错误,参数是导致传输放弃的异常。

以下是一个表格,指示基本调用的顺序和数量

  1. connection_made() – 恰好一次
  2. data_received() – 零次或多次
  3. eof_received() – 最多一次
  4. connection_lost() – 恰好一次

pause_writing()resume_writing() 的调用成对出现,并且只发生在 #1 和 #4 之间。这些对不会嵌套。最后的 resume_writing() 调用可以省略;也就是说,暂停的连接可能会丢失,并且永远不会恢复。

数据报协议

数据报协议具有 connection_made()connection_lost() 方法,其签名与流协议相同。(如关于数据报传输的部分所述,我们更喜欢稍微奇怪的命名法,而不是定义不同的方法名称来指示套接字的打开和关闭。)

此外,它们还具有以下方法

  • datagram_received(data, addr)。表示从远程地址 addr(IPv4 2 元组或 IPv6 4 元组)接收了一个数据报 data(一个字节对象)。
  • error_received(exc)。表示发送或接收操作引发了 OSError 异常。由于数据报错误可能是暂时的,因此由协议决定是否调用传输的 close() 方法来关闭端点。

以下是一个图表,指示调用的顺序和数量

  1. connection_made() – 恰好一次
  2. datagram_received()error_received() – 零次或多次
  3. connection_lost() – 恰好一次

子进程协议

子进程协议具有 connection_made()connection_lost()pause_writing()resume_writing() 方法,其签名与流协议相同。此外,它们还具有以下方法

  • pipe_data_received(fd, data)。当子进程向其标准输出或标准错误写入数据时调用。fd 是文件描述符(标准输出为 1,标准错误为 2)。data 是一个 bytes 对象。
  • pipe_connection_lost(fd, exc)。当子进程关闭其标准输入、标准输出或标准错误时调用。fd 是文件描述符。exc 是一个异常或 None
  • process_exited()。当子进程退出时调用。要检索退出状态,请使用传输的 get_returncode() 方法。

请注意,根据子进程的行为,process_exited() 可能在 pipe_connection_lost() 之前或之后被调用。例如,如果子进程创建了一个共享其标准输入/标准输出/标准错误的子子进程,然后自身退出,则在所有管道都还打开时,可能会调用 process_exited()。另一方面,当子进程关闭其标准输入/标准输出/标准错误,但没有退出时,可能会在没有调用 process_exited() 的情况下,对所有三个管道调用 pipe_connection_lost()。如果(更常见的情况是)子进程退出并因此隐式地关闭所有管道,则调用顺序是未定义的。

回调风格

大多数接受回调的接口也接受位置参数。例如,要安排 foo("abc", 42) 尽快调用,您可以调用 loop.call_soon(foo, "abc", 42)。要安排调用 foo(),请使用 loop.call_soon(foo)。此约定极大地减少了典型回调编程中所需的少量 lambda 的数量。

此约定专门 *不支持* 关键字参数。关键字参数用于传递有关回调的可选额外信息。这允许 API 优雅地演进,而不必担心关键字是否对某个地方的被调用者具有重要意义。如果您有一个 *必须* 使用关键字参数调用的回调,可以使用 lambda。例如

loop.call_soon(lambda: foo('abc', repeat=42))

协程和调度程序

这是一个独立的顶层部分,因为它的状态与事件循环接口不同。协程的使用是可选的,完全可以使用仅回调的方式编写代码。另一方面,调度程序/协程 API 只有一个实现,如果您使用协程,那就是您正在使用的那个。

协程

协程是一个遵循某些约定的生成器。出于文档目的,所有协程都应该用 @asyncio.coroutine 装饰,但这无法严格强制执行。

协程使用 PEP 380 中引入的 yield from 语法,而不是原始的 yield 语法。

“协程”一词,与“生成器”一词一样,用于两个不同的(但相关的)概念

  • 定义协程的函数(用 asyncio.coroutine 装饰的函数定义)。如果需要消除歧义,我们将称之为 *协程函数*。
  • 通过调用协程函数获得的对象。此对象表示最终将完成的计算或 I/O 操作(通常是两者兼而有之)。如果需要消除歧义,我们将称之为 *协程对象*。

协程可以做的事情

  • result = yield from future – 暂停协程,直到期货完成,然后返回期货的结果,或者引发一个异常,该异常将被传播。(如果期货被取消,它将引发一个 CancelledError 异常。)请注意,任务是期货,关于期货的所有内容也适用于任务。
  • result = yield from coroutine – 等待另一个协程生成结果(或引发一个异常,该异常将被传播)。coroutine 表达式必须是另一个协程的 *调用*。
  • return expression – 向正在使用 yield from 等待此协程的协程生成结果。
  • raise exception – 使用 yield from 在等待此协程的协程中引发异常。

调用协程不会立即启动其代码运行 - 它只是一个生成器,调用返回的协程对象实际上是一个生成器对象,在你对其进行迭代之前它不会执行任何操作。对于协程对象,有两种基本方法可以启动它的运行:从另一个协程调用 yield from coroutine(假设另一个协程已经在运行!),或者将其转换为 Task(见下文)。

协程(和任务)只有在事件循环正在运行时才能运行。

等待多个协程

要等待多个协程或 Futures,提供了两个类似于 concurrent.futures 包中的 wait()as_completed() API 的 API

  • asyncio.wait(fs, timeout=None, return_when=ALL_COMPLETED)。这是一个协程,它等待由 fs 给出的 Futures 或协程完成。协程参数将被包装在 Tasks 中(见下文)。它返回一个 Future,其成功的结果是一个包含两个 Futures 集合的元组,(done, pending),其中 done 是已完成(或已取消)的原始 Futures(或包装的协程)的集合,而 pending 是其余的,即那些尚未完成(或尚未取消)的。请注意,使用 timeoutreturn_when 的默认值,done 将始终是一个空列表。可选参数 timeoutreturn_when 的含义和默认值与 concurrent.futures.wait() 相同:timeout,如果非 None,则指定整个操作的超时时间;return_when 指定何时停止。常量 FIRST_COMPLETEDFIRST_EXCEPTIONALL_COMPLETED 的定义值和含义与 PEP 3148 中的相同
    • ALL_COMPLETED(默认):等待所有 Futures 完成(或直到超时发生)。
    • FIRST_COMPLETED:等待至少一个 Future 完成(或直到超时发生)。
    • FIRST_EXCEPTION:等待至少一个 Future 完成但未被取消,并且设置了异常。(从条件中排除已取消的 Futures 令人惊讶,但 PEP 3148 这样做。)
  • asyncio.as_completed(fs, timeout=None)。返回一个迭代器,其值为 Futures 或协程;等待连续的值将等待直到来自集合 fs 的下一个 Future 或协程完成,并返回其结果(或引发其异常)。可选参数 timeout 的含义和默认值与 concurrent.futures.wait() 相同:当超时发生时,迭代器返回的下一个 Future 将在等待时引发 TimeoutError。使用示例
    for f in as_completed(fs):
        result = yield from f  # May raise an exception.
        # Use result.
    

    注意:如果你不等待迭代器产生的值,你的 for 循环可能无法取得进展(因为你没有让其他任务运行)。

  • asyncio.wait_for(f, timeout)。这是一种方便的方法,用于等待单个协程或 Future 超时。当超时发生时,它会取消任务并引发 TimeoutError。要避免任务取消,请将其包装在 shield() 中。
  • asyncio.gather(f1, f2, ...)。返回一个 Future,它将等待所有参数(Futures 或协程)完成并返回其对应结果的列表。如果一个或多个参数被取消或引发异常,返回的 Future 将被取消或设置其异常(与第一个参数发生的情况匹配),而其余参数将在后台继续运行。取消返回的 Future 不会影响参数。请注意,协程参数使用 asyncio.async() 转换为 Futures。
  • asyncio.shield(f)。等待一个 Future,使其免受取消的影响。这将返回一个 Future,其结果或异常与参数完全相同;但是,如果返回的 Future 被取消,参数 Future 不会受到影响。

    此函数的一个用例将是一个协程,它为处理 HTTP 服务器中请求的协程缓存查询结果。当客户端取消请求时,我们可能(可以说)希望查询缓存协程继续运行,这样当客户端重新连接时,查询结果(希望)会被缓存。这可以这样编写,例如:

    @asyncio.coroutine
    def handle_request(self, request):
        ...
        cached_query = self.get_cache(...)
        if cached_query is None:
            cached_query = yield from asyncio.shield(self.fill_cache(...))
        ...
    

睡眠

协程 asyncio.sleep(delay) 在给定的时间延迟后返回。

任务

Task 是一个管理独立运行的协程的对象。Task 接口与 Future 接口相同,实际上 TaskFuture 的子类。当它的协程返回或引发异常时,任务将完成;如果它返回一个结果,该结果将成为任务的结果,如果它引发一个异常,该异常将成为任务的异常。

取消尚未完成的任务会向协程抛出一个 asyncio.CancelledError 异常。如果协程没有捕获它(或者如果它重新引发它),任务将被标记为已取消(即,cancelled() 将返回 True);但如果协程以某种方式捕获并忽略了异常,它可能会继续执行(并且 cancelled() 将返回 False)。

任务对于协程和基于回调的框架(如 Twisted)之间的互操作也很有用。在将协程转换为 Task 后,可以向 Task 添加回调。

要将协程转换为任务,请调用协程函数并将生成的协程对象传递给 loop.create_task() 方法。你也可以为此目的使用 asyncio.ensure_future()

你可能会问,为什么不自动将所有协程转换为 Tasks?@asyncio.coroutine 装饰器可以做到这一点。但是,这会大大降低一个协程调用另一个协程(等等)的情况下的速度,因为切换到“裸”协程的开销远小于切换到 Task。

Task 类派生自 Future,添加了新方法

  • current_task(loop=None)。一个类方法,返回事件循环中当前正在运行的任务。如果loopNone,该方法将返回默认循环的当前任务。每个协程都在一个任务上下文中执行,无论是使用 ensure_future()loop.create_task() 创建的 Task,还是通过使用 yield fromawait 从另一个协程调用。此方法在协程外部调用时(例如,在使用 loop.call_later() 调度的回调中)返回 None
  • all_tasks(loop=None)。一个类方法,返回循环的所有活动任务的集合。如果loopNone,则使用默认循环。

调度程序

调度程序没有公开接口。你可以通过使用 yield from futureyield from task 与它交互。事实上,没有一个单独的对象代表调度程序 - 它的行为是由 TaskFuture 类使用事件循环的公开接口实现的,因此它也适用于第三方事件循环实现。

实用工具

提供了一些函数和类来简化基本的基于流的客户端和服务器(如 FTP 或 HTTP)的编写,例如

  • asyncio.open_connection(host, port)EventLoop.create_connection() 的包装器,不需要你提供 Protocol 工厂或类。这是一个协程,它返回一个 (reader, writer) 对,其中 readerStreamReader 的实例,而 writerStreamWriter 的实例(两者都在下面描述)。
  • asyncio.start_server(client_connected_cb, host, port): 是一个用于 EventLoop.create_server() 的包装器,它接受一个简单的回调函数,而不是一个 Protocol 工厂或类。这是一个协程,它返回一个 Server 对象,就像 create_server() 一样。每次接受客户端连接时,都会调用 client_connected_cb(reader, writer),其中 readerStreamReader 的实例,而 writerStreamWriter 的实例(下面都会描述)。如果 client_connected_cb() 返回的结果是一个协程,它会自动包装在一个 Task 中。
  • StreamReader: 一个提供类似于只读二进制流的接口的类,除了各种读取方法都是协程之外。它通常由一个 StreamReaderProtocol 实例驱动。请注意,应该只有一个读取器。读取器的接口是
    • readline(): 一个协程,它读取一个表示以 '\n' 结尾的文本行或直到流结束的字节字符串,以先到者为准。
    • read(n): 一个协程,它最多读取 n 个字节。如果省略或为负数,则读取到流的末尾。
    • readexactly(n): 一个协程,它精确读取 n 个字节,或直到流的末尾,以先到者为准。
    • exception(): 返回使用 set_exception() 在流上设置的异常,或者如果没有设置异常则返回 None。

    驱动程序的接口是

    • feed_data(data): 将 data(一个 bytes 对象)追加到内部缓冲区。如果它提供了足够的数据来满足读取器的协议,这将解除阻塞的读取协程。
    • feed_eof(): 信号缓冲区的末尾。这将解除阻塞的读取协程。在此调用之后,不应再向读取器提供任何数据。
    • set_exception(exc): 在流上设置异常。所有后续读取方法都将引发此异常。在此调用之后,不应再向读取器提供任何数据。
  • StreamWriter: 一个提供类似于只写二进制流的接口的类。它包装了一个传输。该接口是传输接口的扩展子集:以下方法的行为与相应的传输方法相同:write()writelines()write_eof()can_write_eof()get_extra_info()close()。请注意,写入方法不是协程(这与传输相同,但与 StreamReader 类不同)。以下方法是除传输接口之外的
    • drain(): 在写入大量数据后,应使用 yield from 调用此方法,以进行流量控制。预期的用途如下
      writer.write(data)
      yield from writer.drain()
      

      请注意,从技术上讲,这不是一个协程:它返回一个 Future 或一个空元组(两者都可以传递给 yield from)。使用此方法是可选的。但是,当不使用它时, StreamWriter 下面的传输的内部缓冲区可能会填满所有写入写入器的数据。如果一个应用程序没有对写入的数据量有严格的限制,它应该偶尔调用 yield from drain() 以避免填满传输缓冲区。

  • StreamReaderProtocol: 一个协议实现,用作双向流传输/协议接口与 StreamReaderStreamWriter 类之间的适配器。它充当特定 StreamReader 实例的驱动程序,响应各种协议回调调用其方法 feed_data()feed_eof()set_exception()。它还控制 StreamWriter 实例的 drain() 方法的行为。

同步

模仿 threading 模块中的锁、事件、条件和信号量的锁、事件、条件和信号量已实现,可以通过导入 asyncio.locks 子模块来访问。模仿 queue 模块中的队列的队列已实现,可以通过导入 asyncio.queues 子模块来访问。

通常,这些与它们的线程对应物密切相关,但是,阻塞方法(例如锁上的 acquire()、队列上的 put()get())是协程,并且不提供超时参数(可以使用 asyncio.wait_for() 来向阻塞调用添加超时,但是)。

模块中的文档字符串提供了更完整的文档。

以下类由 asyncio.locks 提供。对于除 Event 之外的所有这些,with 语句可以与 yield from 结合使用以获取锁并确保无论如何离开 with 块都会释放锁,如下所示

with (yield from my_lock):
    ...
  • Lock: 一个基本互斥锁,具有方法 acquire()(一个协程)、locked()release()
  • Event: 一个事件变量,具有方法 wait()(一个协程)、set()clear()is_set()
  • Condition: 一个条件变量,具有方法 acquire()wait()wait_for(predicate)(全部三个协程)、locked()release()notify()notify_all()
  • Semaphore: 一个信号量,具有方法 acquire()(一个协程)、locked()release()。构造函数参数是初始值(默认值为 1)。
  • BoundedSemaphore: 一个有界信号量;这类似于 Semaphore,但初始值也是最大值。

队列

以下类和异常由 asyncio.queues 提供。

  • Queue: 一个标准队列,具有方法 get()put()(都是协程)、get_nowait()put_nowait()empty()full()qsize()maxsize()
  • PriorityQueue: Queue 的子类,它按优先级顺序检索条目(最低优先级排在最前面)。
  • LifoQueue: Queue 的子类,它首先检索最近添加的条目。
  • JoinableQueue: Queue 的子类,具有 task_done()join() 方法(后者是一个协程)。
  • EmptyFull: 当分别在空或满的队列上调用 get_nowait()put_nowait() 时引发的异常。

杂项

日志记录

asyncio 包执行的所有日志记录都使用单个 logging.Logger 对象, asyncio.logger。要自定义日志记录,可以使用此对象上的标准 Logger API。(但不要替换该对象。)

SIGCHLD 处理在 UNIX 上

在子进程协议中高效地实现 process_exited() 方法需要一个 SIGCHLD 信号处理程序。但是,信号处理程序只能设置在与主线程关联的事件循环上。为了支持从运行在其他线程中的事件循环中生成子进程,存在一种机制允许在多个事件循环之间共享 SIGCHLD 处理程序。有两个额外的函数,asyncio.get_child_watcher()asyncio.set_child_watcher(),以及事件循环策略上的对应方法。

有两个子进程观察器实现类,FastChildWatcherSafeChildWatcher。两者都使用 SIGCHLDSafeChildWatcher 类是默认使用的;当同时存在许多子进程时,它效率低下。 FastChildWatcher 类效率很高,但它可能会干扰其他生成子进程的代码(无论是 C 代码还是 Python 代码),而这些代码没有使用 asyncio 事件循环。如果你确定没有使用其他生成子进程的代码,要使用快速实现,请在你的主线程中运行以下代码

watcher = asyncio.FastChildWatcher()
asyncio.set_child_watcher(watcher)

愿望清单

(人们一致认为这些功能是可取的,但在 Python 3.4 beta 1 发布时没有可用的实现,并且 Python 3.4 发布周期的剩余部分的功能冻结禁止在这个后期阶段添加它们。但是,它们有望在 Python 3.5 中添加,也许会更早地在 PyPI 发行版中添加。)

  • 支持将 TCP 套接字升级到 SSL/TLS 的“开始 TLS”操作。

以前愿望清单中已经实现的项目(但没有在 PEP 中指定)

  • UNIX 域套接字。
  • 每个循环的错误处理回调。

待解决问题

(注意,这些问题已经通过 PEP 的接受实际上得到了解决,并支持现状。但是,PEP 的临时状态允许在 Python 3.5 中修改这些决定。)

  • 为什么 create_connection()create_datagram_endpoint() 有一个 proto 参数,而 create_server() 没有?为什么 getaddrinfo() 的 family、flag、proto 参数有时为零,有时为命名常量(其值也为零)?
  • 我们需要另一个查询方法来判断循环是否正在停止?
  • Handle 的更完整的公共 API?有什么用例?
  • 调试 API?例如,记录大量信息,或记录不寻常的情况(例如队列填充速度快于排水速度)甚至回调花费太多时间……
  • 我们需要自省 API 吗?例如,询问给定文件描述符的读取回调。或者下一个调度调用是什么时候。或者注册回调的文件描述符列表。现在,这些都需要使用内部机制。
  • 我们需要更多套接字 I/O 方法吗,例如 sock_sendto()sock_recvfrom(),以及其他方法,如 pipe_read()?我想用户可以自己编写(这并不复杂)。
  • 我们可能需要 API 来控制各种超时。例如,我们可能想要限制在 DNS 解析、连接、ssl/tls 握手、空闲连接、关闭/关闭中花费的时间,甚至每个会话。可能只需在某些方法中添加 timeout 关键字参数就足够了,而其他超时可能可以通过巧妙地使用 call_later()Task.cancel() 来实现。但是,某些操作可能需要默认超时,并且我们可能希望全局(即每个事件循环)更改特定操作的默认超时。

参考

致谢

除了 PEP 3153 之外,影响因素还包括 PEP 380 和 Greg Ewing 的 yield from 教程、Twisted、Tornado、ZeroMQ、pyftpdlib 和 wattle(Steve Dower 的反建议)。我之前在 Google App Engine 的 NDB 库中进行的异步支持工作提供了一个重要的起点。

我感谢 2012 年 9 月至 12 月在 python-ideas 上的无数讨论,以及此后在 python-tulip 上的更多讨论;与 Steve Dower 和 Dino Viehland 的一次 Skype 会议;与 Ben Darnell 的电子邮件交流和一次访问;与 Niels Provos(libevent 的最初作者)的会面;以及与一些 Twisted 开发人员的面对面会议(以及频繁的电子邮件交流),包括 Glyph、Brian Warner、David Reid 和 Duncan McGreggor。

实现的贡献者包括 Eli Bendersky、Gustavo Carneiro(Gambit Research)、Saúl Ibarra Corretgé、Geert Jansen、A. Jesse Jiryu Davis、Nikolay Kim、Charles-François Natali、Richard Oudkerk、Antoine Pitrou、Giampaolo Rodolá、Andrew Svetlov 以及提交了错误和/或修复的许多其他人。

感谢 Antoine Pitrou 在担任官方 PEP BDFL 时的反馈。


来源:https://github.com/python/peps/blob/main/peps/pep-3156.rst

上次修改:2024-03-21 03:48:43 GMT