500行代码以内
使用asyncio协程构建网页爬虫

A. Jesse Jiryu Davis 和 Guido van Rossum

A. Jesse Jiryu Davis 是 MongoDB 在纽约的一名高级工程师。他编写了 Motor,一个异步 MongoDB Python 驱动程序,并且是 MongoDB C 驱动程序的首席开发者以及 PyMongo 团队的成员。他为 asyncio 和 Tornado 做出贡献。他的博客地址是 http://emptysqua.re

Guido van Rossum 是 Python 的创建者,Python 是网络内外主要的编程语言之一。Python 社区称他为 BDFL(终身仁慈独裁者),这个头衔直接来自蒙提·派森的滑稽短剧。Guido 的网络主页是 https://pythonlang.cn/~guido/

引言

经典的计算机科学强调高效的算法,这些算法能够尽快完成计算。但是许多网络程序花费的时间不是在计算,而是在保持许多连接处于打开状态,这些连接速度缓慢或事件频率较低。这些程序提出了一个截然不同的挑战:有效地等待大量网络事件。解决此问题的一种现代方法是异步 I/O 或“async”。

本章介绍一个简单的网页爬虫。爬虫是一个典型的异步应用程序,因为它等待许多响应,但几乎不进行计算。它可以同时获取的页面越多,完成的时间就越快。如果它为每个正在进行的请求分配一个线程,那么随着并发请求数量的增加,它将在耗尽内存或其他与线程相关的资源之前耗尽套接字。它通过使用异步 I/O 避免了对线程的需求。

我们分三个阶段介绍这个例子。首先,我们展示一个异步事件循环,并概述一个使用事件循环和回调的爬虫:它非常高效,但将其扩展到更复杂的问题会导致难以管理的意大利面条代码。因此,其次,我们展示 Python 协程既高效又可扩展。我们使用生成器函数在 Python 中实现简单的协程。在第三阶段,我们使用 Python 标准“asyncio”库1中的功能齐全的协程,并使用异步队列来协调它们。

任务

网页爬虫查找并下载网站上的所有页面,也许是为了存档或索引它们。从根 URL 开始,它获取每个页面,解析页面中指向未访问页面的链接,并将这些链接添加到队列中。当它获取到一个没有未访问链接的页面并且队列为空时,它停止。

我们可以通过并发下载多个页面来加快此过程。当爬虫找到新的链接时,它会在单独的套接字上为新页面启动同时获取操作。它在响应到达时解析响应,并将新的链接添加到队列中。可能会出现某个收益递减点,在该点上过多的并发性会降低性能,因此我们限制并发请求的数量,并将剩余的链接保留在队列中,直到某些正在进行的请求完成。

传统方法

我们如何使爬虫并发?传统上,我们会创建一个线程池。每个线程负责一次从套接字下载一个页面。例如,要从 xkcd.com 下载一个页面

def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

默认情况下,套接字操作是阻塞的:当线程调用 connectrecv 等方法时,它会暂停直到操作完成。2 因此,要同时下载多个页面,我们需要多个线程。一个复杂的应用程序通过将空闲线程保存在线程池中来摊销线程创建的成本,然后将它们检出以重用于后续任务;它对连接池中的套接字也执行相同的操作。

然而,线程是昂贵的,操作系统对进程、用户或机器可以拥有的线程数量施加各种硬性限制。在 Jesse 的系统上,一个 Python 线程大约需要 50k 的内存,启动数万个线程会导致故障。如果我们扩展到在并发套接字上进行数万次同时操作,那么我们在耗尽线程之前就会耗尽套接字。每个线程的开销或系统对线程的限制是瓶颈。

在 Dan Kegel 有影响力的文章“C10K 问题”3中,他概述了多线程用于 I/O 并发的局限性。他开头写道:

您不认为现在是时候让 Web 服务器同时处理一万个客户端了吗?毕竟,Web 现在是一个很大的地方了。

Kegel 在 1999 年创造了“C10K”一词。一万个连接现在听起来很小巧,但问题只在规模上发生了变化,本质上没有变化。那时,每个连接使用一个线程来处理 C10K 是不切实际的。现在上限高了好几个数量级。事实上,我们的玩具网页爬虫使用线程就可以正常工作。然而,对于非常大规模的应用程序,拥有数十万个连接,上限仍然存在:有一个限制,超过这个限制,大多数系统仍然可以创建套接字,但已经耗尽了线程。我们如何克服这个问题呢?

Async

异步 I/O 框架使用非阻塞套接字在单个线程上执行并发操作。在我们的异步爬虫中,我们在开始连接到服务器之前将套接字设置为非阻塞的

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

令人恼火的是,即使非阻塞套接字正常工作,connect 也会抛出异常。此异常复制了底层 C 函数的令人恼火的行为,该函数将 errno 设置为 EINPROGRESS 以告诉您它已经开始。

现在我们的爬虫需要一种方法来知道连接何时建立,以便它可以发送 HTTP 请求。我们可以简单地在紧密循环中不断尝试

request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
encoded = request.encode('ascii')

while True:
    try:
        sock.send(encoded)
        break  # Done.
    except OSError as e:
        pass

print('sent')

此方法不仅浪费电能,而且无法有效地等待多个套接字上的事件。在古代,BSD Unix 解决此问题的方法是 select,这是一个 C 函数,它等待非阻塞套接字或其中的一小部分数组上发生的事件。如今,对具有大量连接的 Internet 应用程序的需求导致了 poll 等替代方案,然后是 BSD 上的 kqueue 和 Linux 上的 epoll。这些 API 类似于 select,但在处理大量连接时性能良好。

Python 3.4 的 DefaultSelector 使用系统上可用的最佳 select 类函数。要注册有关网络 I/O 的通知,我们创建一个非阻塞套接字并将其注册到默认选择器中

from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

def connected():
    selector.unregister(sock.fileno())
    print('connected!')

selector.register(sock.fileno(), EVENT_WRITE, connected)

我们忽略了虚假错误并调用 selector.register,传入套接字的文件描述符和一个表示我们正在等待什么事件的常量。要收到连接建立时的通知,我们传递 EVENT_WRITE:也就是说,我们想知道套接字何时“可写”。我们还传递了一个 Python 函数 connected,当该事件发生时运行。这样的函数被称为回调

我们处理选择器接收到的 I/O 通知,在一个循环中

def loop():
    while True:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

connected 回调存储为 event_key.data,我们在非阻塞套接字连接后检索并执行它。

与我们上面快速旋转的循环不同,这里的 select 调用会暂停,等待下一个 I/O 事件。然后循环运行正在等待这些事件的回调。尚未完成的操作将保持挂起状态,直到事件循环的某个未来时钟周期。

我们已经展示了什么?我们展示了如何开始一个操作并在操作准备就绪时执行一个回调。异步框架建立在我们展示的两个特性——非阻塞套接字和事件循环——的基础上,以在单个线程上运行并发操作。

我们在这里实现了“并发”,但不是传统意义上的“并行”。也就是说,我们构建了一个小型系统,该系统执行重叠的 I/O。它能够在其他操作正在进行时开始新的操作。它实际上并没有利用多个核心来并行执行计算。但另一方面,此系统是为 I/O 绑定问题而设计的,而不是 CPU 绑定问题。4

因此,我们的事件循环在并发 I/O 方面效率很高,因为它没有为每个连接分配线程资源。但在我们继续之前,必须纠正一个常见的误解,即 async 比多线程更快。通常情况并非如此——实际上,在 Python 中,像我们这样的事件循环在为少量非常活跃的连接提供服务时比多线程略慢。在没有全局解释器锁的运行时中,线程在这样的工作负载下会表现得更好。异步 I/O 适合的是具有许多缓慢或休眠连接且事件频率较低的应用程序。5

使用回调进行编程

使用我们迄今为止构建的简陋异步框架,我们如何构建一个网页爬虫?即使是一个简单的 URL 获取器也很难编写。

我们从全局集合开始,这些集合包含我们尚未获取的 URL 和我们已经看到的 URL

urls_todo = set(['/'])
seen_urls = set(['/'])

seen_urls 集合包含 urls_todo 和已完成的 URL。这两个集合都用根 URL “/” 初始化。

获取页面将需要一系列回调。当套接字连接时,connected 回调触发,并向服务器发送 GET 请求。但随后它必须等待响应,因此它注册了另一个回调。如果在该回调触发时,它还无法读取完整响应,则它会再次注册,依此类推。

让我们将这些回调收集到一个 Fetcher 对象中。它需要一个 URL、一个套接字对象以及一个用于累积响应字节的位置

class Fetcher:
    def __init__(self, url):
        self.response = b''  # Empty array of bytes.
        self.url = url
        self.sock = None

我们首先调用 Fetcher.fetch

    # Method on Fetcher class.
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass

        # Register next callback.
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

fetch 方法开始连接套接字。但请注意,该方法在连接建立之前就返回了。它必须将控制权返回给事件循环以等待连接。要理解为什么,假设我们的整个应用程序的结构是这样的

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
fetcher.fetch()

while True:
    events = selector.select()
    for event_key, event_mask in events:
        callback = event_key.data
        callback(event_key, event_mask)

所有事件通知都在事件循环调用 select 时处理。因此,fetch 必须将控制权交给事件循环,以便程序知道套接字何时连接。只有这样,循环才会运行在 fetch 末尾注册的 connected 回调。

以下是 connected 的实现

    # Method on Fetcher class.
    def connected(self, key, mask):
        print('connected!')
        selector.unregister(key.fd)
        request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(self.url)
        self.sock.send(request.encode('ascii'))

        # Register the next callback.
        selector.register(key.fd,
                          EVENT_READ,
                          self.read_response)

该方法发送一个 GET 请求。一个真正的应用程序会检查 send 的返回值,以防无法一次发送整个消息。但是我们的请求很小,我们的应用程序也不复杂。它轻率地调用 send,然后等待响应。当然,它必须注册另一个回调并将控制权交给事件循环。下一个也是最后一个回调 read_response 处理服务器的回复

    # Method on Fetcher class.
    def read_response(self, key, mask):
        global stopped

        chunk = self.sock.recv(4096)  # 4k chunk size.
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)  # Done reading.
            links = self.parse_links()

            # Python set-logic:
            for link in links.difference(seen_urls):
                urls_todo.add(link)
                Fetcher(link).fetch()  # <- New Fetcher.

            seen_urls.update(links)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True

每次选择器看到套接字“可读”时都会执行回调,这可能意味着两件事:套接字有数据或已关闭。

回调从套接字请求最多 4 千字节的数据。如果准备就绪的数据少于 4 千字节,则 chunk 包含可用的任何数据。如果数据更多,则 chunk 长度为 4 千字节,套接字仍然可读,因此事件循环在下一个时钟周期再次运行此回调。当响应完成时,服务器已关闭套接字,并且 chunk 为空。

parse_links 方法(未显示)返回一个 URL 集合。我们为每个新 URL 启动一个新的 fetcher,没有并发上限。请注意异步编程使用回调的一个不错的特性:我们不需要围绕对共享数据的更改使用互斥锁,例如当我们将链接添加到 seen_urls 时。没有抢占式多任务处理,因此我们不会在代码的任意点被中断。

我们添加了一个全局变量stopped并用它来控制循环。

stopped = False

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

一旦所有页面都下载完毕,获取器就会停止全局事件循环,程序退出。

这个例子清楚地说明了异步编程的问题:意大利面条代码。我们需要一种方法来表达一系列计算和I/O操作,并安排多个这样的操作序列并发运行。但是如果没有线程,一系列操作就无法收集到一个函数中:每当一个函数开始一个I/O操作时,它都会显式地保存将来需要的任何状态,然后返回。你需要负责思考和编写这些保存状态的代码。

让我们解释一下我们的意思。考虑一下我们如何简单地在具有传统阻塞套接字的线程上获取URL。

# Blocking version.
def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0\r\nHost: xkcd.com\r\n\r\n'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

这个函数在一个套接字操作和下一个套接字操作之间记住什么状态?它有套接字、URL和累积的response。在线程上运行的函数使用编程语言的基本特性将其临时状态存储在局部变量中,在其栈上。函数还有一个“延续”——也就是说,它计划在I/O完成后执行的代码。运行时通过存储线程的指令指针来记住延续。你不需要考虑在I/O之后恢复这些局部变量和延续。它是内置在语言中的。

但是对于基于回调的异步框架,这些语言特性毫无用处。在等待I/O时,函数必须显式地保存其状态,因为函数在I/O完成之前返回并丢失其栈帧。作为局部变量的替代,我们基于回调的示例将sockresponse存储为self(Fetcher实例)的属性。作为指令指针的替代,它通过注册回调connectedread_response来存储其延续。随着应用程序功能的增长,我们手动跨回调保存的状态的复杂性也随之增加。这种繁重的簿记工作使编码人员容易患上偏头痛。

更糟糕的是,如果回调在调度链中的下一个回调之前抛出异常会发生什么?假设我们在parse_links方法上做得不好,并且它在解析某些HTML时抛出了异常。

Traceback (most recent call last):
  File "loop-with-callbacks.py", line 111, in <module>
    loop()
  File "loop-with-callbacks.py", line 106, in loop
    callback(event_key, event_mask)
  File "loop-with-callbacks.py", line 51, in read_response
    links = self.parse_links()
  File "loop-with-callbacks.py", line 67, in parse_links
    raise Exception('parse error')
Exception: parse error

堆栈跟踪仅显示事件循环正在运行回调。我们不记得导致错误的原因。链条的两端都断裂了:我们忘记了要去哪里以及从哪里来。这种上下文丢失称为“栈撕裂”,在许多情况下,它会使调查人员感到困惑。栈撕裂也阻止我们为一系列回调安装异常处理程序,就像“try / except”块包装函数调用及其树状后代一样。6

因此,即使撇开关于多线程和异步的相对效率的长期争论,也存在关于哪种方法更容易出错的另一场争论:如果你在同步线程时犯了错误,线程容易出现数据竞争,但由于栈撕裂,回调很难调试。

协程

我们用一个承诺来吸引你。可以编写异步代码,将回调的效率与多线程编程的经典外观相结合。这种组合是通过一种称为“协程”的模式实现的。使用Python 3.4的标准asyncio库和一个名为“aiohttp”的包,在协程中获取URL非常直接7

    @asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

它也是可扩展的。与每个线程50k的内存和操作系统对线程的硬性限制相比,Python协程在Jesse的系统上几乎只占用3k的内存。Python可以轻松启动数十万个协程。

协程的概念,可以追溯到计算机科学的早期,很简单:它是一个可以暂停和恢复的子程序。线程由操作系统抢占式地进行多任务处理,而协程则协作式地进行多任务处理:它们选择何时暂停以及接下来运行哪个协程。

协程有很多实现;即使在Python中也有几种。Python 3.4中标准“asyncio”库中的协程建立在生成器、Future类和“yield from”语句之上。从Python 3.5开始,协程成为语言本身的原生特性8;但是,理解Python 3.4中首次实现的协程,使用预先存在的语言设施,是解决Python 3.5原生协程的基础。

为了解释Python 3.4基于生成器的协程,我们将对生成器及其在asyncio中如何用作协程进行阐述,并相信你会像我们编写它一样享受阅读它。一旦我们解释了基于生成器的协程,我们将在我们的异步网络爬虫中使用它们。

Python生成器的工作原理

在掌握Python生成器之前,你必须了解常规Python函数的工作原理。通常,当Python函数调用子程序时,子程序会保留控制权,直到它返回或抛出异常。然后控制权返回给调用者。

>>> def foo():
...     bar()
...
>>> def bar():
...     pass

标准Python解释器是用C编写的。执行Python函数的C函数称为,悦耳地,PyEval_EvalFrameEx。它接受一个Python栈帧对象并在帧的上下文中评估Python字节码。以下是foo的字节码。

>>> import dis
>>> dis.dis(foo)
  2           0 LOAD_GLOBAL              0 (bar)
              3 CALL_FUNCTION            0 (0 positional, 0 keyword pair)
              6 POP_TOP
              7 LOAD_CONST               0 (None)
             10 RETURN_VALUE

foo函数将其栈上的bar加载并调用它,然后从栈中弹出其返回值,将None加载到栈上,并返回None

PyEval_EvalFrameEx遇到CALL_FUNCTION字节码时,它会创建一个新的Python栈帧并递归调用:也就是说,它使用新帧递归调用PyEval_EvalFrameEx,该帧用于执行bar

务必理解,Python栈帧是在堆内存中分配的!Python解释器是一个普通的C程序,因此它的栈帧是普通的栈帧。但它操作的Python栈帧在堆上。除了其他意外之外,这意味着Python栈帧可以比其函数调用存活更久。要以交互方式查看这一点,请从bar内部保存当前帧。

>>> import inspect
>>> frame = None
>>> def foo():
...     bar()
...
>>> def bar():
...     global frame
...     frame = inspect.currentframe()
...
>>> foo()
>>> # The frame was executing the code for 'bar'.
>>> frame.f_code.co_name
'bar'
>>> # Its back pointer refers to the frame for 'foo'.
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'
Figure 5.1 - Function Calls

图5.1 - 函数调用

现在为Python生成器设置了舞台,它使用相同的构建块——代码对象和栈帧——产生神奇的效果。

这是一个生成器函数。

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...     

当Python将gen_fn编译成字节码时,它会看到yield语句并知道gen_fn是一个生成器函数,而不是一个常规函数。它设置一个标志来记住这一事实。

>>> # The generator flag is bit position 5.
>>> generator_bit = 1 << 5
>>> bool(gen_fn.__code__.co_flags & generator_bit)
True

当你调用一个生成器函数时,Python会看到生成器标志,并且它实际上不会运行该函数。相反,它会创建一个生成器。

>>> gen = gen_fn()
>>> type(gen)
<class 'generator'>

Python生成器封装了一个栈帧加上对某些代码的引用,即gen_fn的主体。

>>> gen.gi_code.co_name
'gen_fn'

gen_fn的所有生成器调用都指向相同的代码。但每个都有自己的栈帧。这个栈帧不在任何实际的栈上,它位于堆内存中等待使用。

Figure 5.2 - Generators

图5.2 - 生成器

该帧有一个“最后指令”指针,即它最近执行的指令。一开始,最后指令指针是-1,表示生成器尚未开始。

>>> gen.gi_frame.f_lasti
-1

当我们调用send时,生成器到达它的第一个yield并暂停。send的返回值是1,因为这是gen传递给yield表达式的值。

>>> gen.send(None)
1

生成器的指令指针现在位于开始处的3个字节码处,位于56字节编译的Python代码的一部分。

>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56

生成器可以随时从任何函数恢复,因为它的栈帧实际上不在栈上:它在堆上。它在调用层次结构中的位置不是固定的,它不需要遵守常规函数的先入先出执行顺序。它被解放了,像云一样自由飘荡。

我们可以将值“hello”发送到生成器中,它将成为yield表达式的结果,并且生成器将继续运行,直到它产生2。

>>> gen.send('hello')
result of yield: hello
2

它的栈帧现在包含局部变量result

>>> gen.gi_frame.f_locals
{'result': 'hello'}

gen_fn创建的其他生成器将有自己的栈帧和局部变量。

当我们再次调用send时,生成器将从第二个yield处继续,并通过引发特殊的StopIteration异常来结束。

>>> gen.send('goodbye')
result of 2nd yield: goodbye
Traceback (most recent call last):
  File "<input>", line 1, in <module>
StopIteration: done

异常有一个值,它是生成器的返回值:字符串"done"

使用生成器构建协程

因此生成器可以暂停,它可以用一个值恢复,并且它有一个返回值。听起来像一个很好的原语,可以在其上构建异步编程模型,而无需意大利面条般的回调!我们想要构建一个“协程”:一个与程序中的其他例程协作调度的例程。我们的协程将是Python标准“asyncio”库中协程的简化版本。与asyncio一样,我们将使用生成器、期货和“yield from”语句。

首先我们需要一种方法来表示协程正在等待的某个未来结果。一个简化的版本。

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

期货最初是“挂起的”。它通过调用set_result来“解析”。9

让我们调整我们的获取器以使用期货和协程。我们用回调编写了fetch

class Fetcher:
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

    def connected(self, key, mask):
        print('connected!')
        # And so on....

fetch方法开始连接套接字,然后注册回调connected,以便在套接字准备就绪时执行。现在我们可以将这两个步骤合并成一个协程。

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass

        f = Future()

        def on_connected():
            f.set_result(None)

        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        yield f
        selector.unregister(sock.fileno())
        print('connected!')

现在fetch是一个生成器函数,而不是一个常规函数,因为它包含一个yield语句。我们创建一个挂起的期货,然后将其产生以暂停fetch,直到套接字准备就绪。内部函数on_connected解析期货。

但是当期货解析时,是什么恢复了生成器?我们需要一个协程驱动程序。让我们称之为“任务”。

class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
Task(fetcher.fetch())

loop()

任务通过向fetch生成器发送None来启动它。然后fetch运行,直到它产生一个期货,任务将其捕获为next_future。当套接字连接时,事件循环运行回调on_connected,该回调解析期货,该期货调用step,该回调恢复fetch

使用yield from分解协程

套接字连接后,我们发送HTTP GET请求并读取服务器响应。这些步骤不再需要分散在回调中;我们将它们收集到同一个生成器函数中。

    def fetch(self):
        # ... connection logic from above, then:
        sock.send(request.encode('ascii'))

        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(),
                              EVENT_READ,
                              on_readable)
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                # Done reading.
                break

这段代码从套接字读取整个消息,看起来通常很有用。我们如何将其从fetch分解到子程序中?现在Python 3著名的yield from登场了。它允许一个生成器委托给另一个生成器。

要了解如何做到这一点,让我们回到我们简单的生成器示例。

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...     

要从另一个生成器调用此生成器,请使用yield from委托给它。

>>> # Generator function:
>>> def caller_fn():
...     gen = gen_fn()
...     rv = yield from gen
...     print('return value of yield-from: {}'
...           .format(rv))
...
>>> # Make a generator from the
>>> # generator function.
>>> caller = caller_fn()

caller生成器表现得好像它是gen,它委托给的生成器。

>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti  # Hasn't advanced.
15
>>> caller.send('goodbye')
result of 2nd yield: goodbye
return value of yield-from: done
Traceback (most recent call last):
  File "<input>", line 1, in <module>
StopIteration

callergen中产出值时,caller不会前进。请注意,即使内部生成器gen从一个yield语句前进到下一个,它的指令指针仍然停留在15处,也就是它的yield from语句所在的位置。10caller外部来看,我们无法判断它产出的值是来自caller还是来自它委托的生成器。并且在gen内部,我们无法判断发送进来的值是来自caller还是来自外部。yield from语句是一个无摩擦的通道,值通过它流进流出gen,直到gen完成。

协程可以使用yield from将工作委托给子协程,并接收工作的结果。请注意,在上面,caller打印了“yield-from的返回值:done”。当gen完成时,它的返回值变成了calleryield from语句的值。

    rv = yield from gen

早些时候,当我们批评基于回调的异步编程时,我们最强烈的不满在于“栈撕裂”:当回调抛出异常时,栈跟踪通常毫无用处。它只显示事件循环正在运行回调,而不是为什么。协程的表现如何呢?

>>> def gen_fn():
...     raise Exception('my error')
>>> caller = caller_fn()
>>> caller.send(None)
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "<input>", line 3, in caller_fn
  File "<input>", line 2, in gen_fn
Exception: my error

这更有用!栈跟踪显示caller_fn在委托给gen_fn时抛出了错误。更令人欣慰的是,我们可以用异常处理程序包装对子协程的调用,就像普通的子程序一样。

>>> def gen_fn():
...     yield 1
...     raise Exception('uh oh')
...
>>> def caller_fn():
...     try:
...         yield from gen_fn()
...     except Exception as exc:
...         print('caught {}'.format(exc))
...
>>> caller = caller_fn()
>>> caller.send(None)
1
>>> caller.send('hello')
caught uh oh

因此,我们用子协程来分解逻辑,就像用普通子程序一样。让我们从我们的获取器中分解一些有用的子协程。我们编写一个read协程来接收一个块。

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield f  # Read one chunk.
    selector.unregister(sock.fileno())
    return chunk

我们基于read构建一个read_all协程,它接收整个消息。

def read_all(sock):
    response = []
    # Read whole response.
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)

    return b''.join(response)

如果你以正确的方式眯着眼睛看,yield from语句就会消失,这些看起来就像执行阻塞I/O的常规函数。但实际上,readread_all是协程。从read产出暂停read_all,直到I/O完成。当read_all暂停时,asyncio的事件循环执行其他工作并等待其他I/O事件;一旦其事件就绪,read_all将在下一个循环周期内使用read的结果恢复。

在栈的根部,fetch调用read_all

class Fetcher:
    def fetch(self):
         # ... connection logic from above, then:
        sock.send(request.encode('ascii'))
        self.response = yield from read_all(sock)

奇迹般地,Task类不需要修改。它驱动外部fetch协程的方式与之前完全相同。

Task(fetcher.fetch())
loop()

read产出一个future时,任务通过yield from语句的通道接收它,就像future直接从fetch产出一样。当循环解析future时,任务将其结果发送到fetch,并且read接收该值,就像任务直接驱动read一样。

Figure 5.3 - Yield From

图5.3 - Yield From

为了完善我们的协程实现,我们解决了一个小问题:我们的代码在等待future时使用yield,但在委托给子协程时使用yield from。如果我们无论何时协程暂停都使用yield from,会更完善。这样,协程就不需要关心它等待的是什么类型的对象。

我们利用了Python中生成器和迭代器之间的深度对应关系。从调用者的角度来看,推进生成器与推进迭代器相同。因此,我们通过实现一个特殊方法使我们的Future类可迭代。

    # Method on Future class.
    def __iter__(self):
        # Tell Task to resume me here.
        yield self
        return self.result

future的__iter__方法是一个产出future本身的协程。现在,当我们用以下代码替换这段代码时

# f is a Future.
yield f

...用这个

# f is a Future.
yield from f

...结果是一样的!驱动任务从其对send的调用中接收future,并且当future被解析时,它将新结果发送回协程。

在任何地方使用yield from的优势是什么?为什么这比用yield等待future并用yield from委托给子协程更好?它更好是因为现在,方法可以自由地更改其实现而不会影响调用者:它可能是一个返回future的普通方法,该future将解析为一个值,或者它可能是一个包含yield from语句并返回一个值的协程。在任何一种情况下,调用者只需要yield from该方法即可等待结果。

亲爱的读者,我们已经完成了对asyncio中协程的愉快阐述。我们深入研究了生成器的机制,并概述了future和task的实现。我们概述了asyncio如何获得两全其美:比线程更有效率、比回调更易读的并发I/O。当然,真正的asyncio比我们的草图复杂得多。真正的框架解决了零拷贝I/O、公平调度、异常处理以及大量其他特性。

对于asyncio用户来说,使用协程进行编码比你在这里看到的简单得多。在上面的代码中,我们从第一原理实现了协程,所以你看到了回调、任务和future。你甚至看到了非阻塞套接字和对select的调用。但是,当要使用asyncio构建应用程序时,你的代码中不会出现这些内容。正如我们承诺的那样,你现在可以流畅地获取URL。

    @asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

对这次阐述感到满意,我们回到最初的任务:使用asyncio编写一个异步网络爬虫。

协程的协调

我们首先描述了我们希望爬虫如何工作。现在是时候用asyncio协程来实现了。

我们的爬虫将获取第一个页面,解析其链接,并将它们添加到队列中。在此之后,它会在整个网站上扩展,并发地获取页面。但是为了限制客户端和服务器上的负载,我们希望运行一定数量的最大工作线程,而不是更多。每当一个工作线程完成获取页面后,它应该立即从队列中拉取下一个链接。我们将经历一段时间,在此期间没有足够的工作供所有人使用,因此一些工作线程必须暂停。但是当一个工作线程遇到一个包含大量新链接的页面时,队列会突然增长,任何暂停的工作线程都应该醒来并开始工作。最后,我们的程序必须在完成工作后退出。

想象一下,如果工作线程是线程。我们将如何表达爬虫的算法?我们可以使用Python标准库中的同步队列11。每次将项目放入队列时,队列都会增加其“任务”计数。工作线程在完成项目工作后调用task_done。主线程在Queue.join上阻塞,直到放入队列中的每个项目都与一个task_done调用匹配,然后它退出。

协程使用与asyncio队列完全相同的模式!首先,我们导入它12

try:
    from asyncio import JoinableQueue as Queue
except ImportError:
    # In Python 3.5, asyncio.JoinableQueue is
    # merged into Queue.
    from asyncio import Queue

我们将工作线程共享的状态收集到一个爬虫类中,并在其crawl方法中编写主要逻辑。我们在协程上启动crawl并运行asyncio的事件循环,直到crawl完成。

loop = asyncio.get_event_loop()

crawler = crawling.Crawler('http://xkcd.com',
                           max_redirect=10)

loop.run_until_complete(crawler.crawl())

爬虫从一个根URL和max_redirect开始,它表示它愿意跟踪以获取任何一个URL的重定向次数。它将(URL, max_redirect)对放入队列。(至于原因,请继续关注。)

class Crawler:
    def __init__(self, root_url, max_redirect):
        self.max_tasks = 10
        self.max_redirect = max_redirect
        self.q = Queue()
        self.seen_urls = set()

        # aiohttp's ClientSession does connection pooling and
        # HTTP keep-alives for us.
        self.session = aiohttp.ClientSession(loop=loop)

        # Put (URL, max_redirect) in the queue.
        self.q.put((root_url, self.max_redirect))

队列中未完成的任务数量现在为1。回到我们的主脚本中,我们启动事件循环和crawl方法。

loop.run_until_complete(crawler.crawl())

crawl协程启动工作线程。它就像一个主线程:它在join上阻塞,直到所有任务都完成,而工作线程在后台运行。

    @asyncio.coroutine
    def crawl(self):
        """Run the crawler until all work is done."""
        workers = [asyncio.Task(self.work())
                   for _ in range(self.max_tasks)]

        # When all work is done, exit.
        yield from self.q.join()
        for w in workers:
            w.cancel()

如果工作线程是线程,我们可能不希望一次启动所有线程。为了避免在确定线程是必需的之前创建昂贵的线程,线程池通常按需增长。但是协程很便宜,因此我们只需启动允许的最大数量即可。

有趣的是,我们是如何关闭爬虫的。当join future解析时,工作线程任务是存活的但已暂停:它们等待更多URL,但没有URL出现。因此,主协程在退出之前取消它们。否则,当Python解释器关闭并调用所有对象的析构函数时,存活的任务会发出哀嚎。

ERROR:asyncio:Task was destroyed but it is pending!

那么cancel是如何工作的呢?生成器有一个我们还没有向你展示的功能。你可以从外部向生成器抛出异常。

>>> gen = gen_fn()
>>> gen.send(None)  # Start the generator as usual.
1
>>> gen.throw(Exception('error'))
Traceback (most recent call last):
  File "<input>", line 3, in <module>
  File "<input>", line 2, in gen_fn
Exception: error

生成器由throw恢复,但它现在正在引发异常。如果生成器调用栈中的任何代码都没有捕获它,则异常会冒泡到顶部。因此,要取消任务的协程

    # Method of Task class.
    def cancel(self):
        self.coro.throw(CancelledError)

无论生成器在哪个yield from语句处暂停,它都会恢复并抛出异常。我们在任务的step方法中处理取消。

    # Method of Task class.
    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except CancelledError:
            self.cancelled = True
            return
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

现在任务知道它被取消了,因此当它被销毁时,它不会对光之将逝而愤怒。

一旦crawl取消了工作线程,它就会退出。事件循环看到协程已完成(我们稍后将看到如何完成),它也会退出。

loop.run_until_complete(crawler.crawl())

crawl方法包含主协程必须执行的所有操作。正是工作线程从队列中获取URL,获取它们,并解析它们以获取新链接。每个工作线程独立运行work协程。

    @asyncio.coroutine
    def work(self):
        while True:
            url, max_redirect = yield from self.q.get()

            # Download page and add new links to self.q.
            yield from self.fetch(url, max_redirect)
            self.q.task_done()

Python看到此代码包含yield from语句,并将其编译成一个生成器函数。因此,在crawl中,当主协程调用self.work十次时,它实际上并没有执行此方法:它只创建了十个具有对这段代码的引用的生成器对象。它将每个对象包装在Task中。Task接收生成器产出的每个future,并在future解析时通过使用每个future的结果调用send来驱动生成器。因为生成器有自己的栈帧,所以它们独立运行,具有单独的局部变量和指令指针。

工作线程通过队列与其同伴协调。它使用以下方法等待新的URL:

    url, max_redirect = yield from self.q.get()

队列的get方法本身就是一个协程:它会暂停,直到有人将项目放入队列,然后恢复并返回该项目。

顺便说一句,这就是在爬虫结束时,当主协程取消它时,工作线程将被暂停的地方。从协程的角度来看,当yield from引发CancelledError时,它的最后一次循环结束。

当一个工作线程获取页面时,它会解析链接并将新的链接放入队列,然后调用task_done以递减计数器。最终,一个工作线程获取一个其所有URL都已获取的页面,并且队列中也没有剩余工作。因此,此工作线程对task_done的调用会将计数器递减到零。然后,正在等待队列的join方法的crawl被解除暂停并完成。

我们承诺解释为什么队列中的项目是成对的,例如

# URL to fetch, and the number of redirects left.
('http://xkcd.com/353', 10)

新的URL有十次重定向剩余。获取此特定URL会导致重定向到一个带有尾部斜杠的新位置。我们减少了剩余的重定向次数,并将下一个位置放入队列。

# URL with a trailing slash. Nine redirects left.
('http://xkcd.com/353/', 9)

我们使用的aiohttp包默认情况下会跟踪重定向并提供最终响应。但是,我们告诉它不要这样做,并在爬虫中处理重定向,以便它可以合并导致相同目标的重定向路径:如果我们已经看到此URL,它位于self.seen_urls中,并且我们已经从不同的入口点开始处理此路径。

Figure 5.4 - Redirects

图5.4 - 重定向

爬虫获取“foo”并看到它重定向到“baz”,因此它将“baz”添加到队列和seen_urls中。如果它获取的下一个页面是“bar”,它也重定向到“baz”,则获取器不会再次将“baz”放入队列。如果响应是页面而不是重定向,则fetch会解析其链接并将新的链接放入队列。

    @asyncio.coroutine
    def fetch(self, url, max_redirect):
        # Handle redirects ourselves.
        response = yield from self.session.get(
            url, allow_redirects=False)

        try:
            if is_redirect(response):
                if max_redirect > 0:
                    next_url = response.headers['location']
                    if next_url in self.seen_urls:
                        # We have been down this path before.
                        return

                    # Remember we have seen this URL.
                    self.seen_urls.add(next_url)

                    # Follow the redirect. One less redirect remains.
                    self.q.put_nowait((next_url, max_redirect - 1))
             else:
                 links = yield from self.parse_links(response)
                 # Python set-logic:
                 for link in links.difference(self.seen_urls):
                    self.q.put_nowait((link, self.max_redirect))
                self.seen_urls.update(links)
        finally:
            # Return connection to pool.
            yield from response.release()

如果这是多线程代码,它将充斥着竞争条件。例如,工作线程检查链接是否在seen_urls中,如果不在,则工作线程将其放入队列并将其添加到seen_urls中。如果它在这两个操作之间被中断,那么另一个工作线程可能会从不同的页面解析相同的链接,也观察到它不在seen_urls中,并且也将其添加到队列中。现在同一个链接在队列中出现了两次,导致(充其量)重复工作和错误的统计数据。

然而,协程只在yield from语句处容易受到中断。这是一个关键的区别,它使得协程代码比多线程代码更不容易出现竞争条件:多线程代码必须通过获取锁显式地进入临界区,否则它是可以被中断的。Python 协程默认情况下是不可中断的,并且只在显式地让出控制权时才会让出控制权。

我们不再需要像基于回调的程序中那样使用 fetcher 类。该类是对回调缺陷的一种解决方法:它们需要一些地方来存储状态,以便在等待 I/O 时使用,因为它们的局部变量在跨调用时不会被保留。但是fetch协程可以像普通函数一样在其局部变量中存储其状态,因此不再需要类。

fetch完成处理服务器响应时,它会返回给调用者workwork方法在队列上调用task_done,然后从队列中获取下一个要获取的 URL。

fetch将新的链接放入队列时,它会增加未完成任务的数量,并使正在等待q.join的主协程保持暂停状态。但是,如果没有未查看的链接,并且这是队列中的最后一个 URL,那么当work调用task_done时,未完成任务的数量将降至零。该事件会解除join的暂停,并且主协程完成。

协调工作线程和主协程的队列代码如下所示13

class Queue:
    def __init__(self):
        self._join_future = Future()
        self._unfinished_tasks = 0
        # ... other initialization ...

    def put_nowait(self, item):
        self._unfinished_tasks += 1
        # ... store the item ...

    def task_done(self):
        self._unfinished_tasks -= 1
        if self._unfinished_tasks == 0:
            self._join_future.set_result(None)

    @asyncio.coroutine
    def join(self):
        if self._unfinished_tasks > 0:
            yield from self._join_future

主协程crawljoin中产生。因此,当最后一个工作线程将未完成任务的数量减少到零时,它会向crawl发出信号以恢复并完成。

旅程即将结束。我们的程序从调用crawl开始

loop.run_until_complete(self.crawler.crawl())

程序如何结束?由于crawl是一个生成器函数,调用它会返回一个生成器。为了驱动生成器,asyncio 将其包装在一个任务中

class EventLoop:
    def run_until_complete(self, coro):
        """Run until the coroutine is done."""
        task = Task(coro)
        task.add_done_callback(stop_callback)
        try:
            self.run_forever()
        except StopError:
            pass

class StopError(BaseException):
    """Raised to stop the event loop."""

def stop_callback(future):
    raise StopError

当任务完成后,它会引发StopError,循环将其用作已到达正常完成的信号。

但是这是什么?该任务有名为add_done_callbackresult的方法?你可能会认为任务类似于 future。你的直觉是正确的。我们必须承认一个关于 Task 类的细节,我们对你们隐瞒了:任务是一个 future。

class Task(Future):
    """A coroutine wrapped in a Future."""

通常,future 由其他人调用其上的set_result来解析。但是任务在协程停止时自行解析。请记住,在我们之前对 Python 生成器的探索中,当生成器返回时,它会抛出特殊的StopIteration异常

    # Method of class Task.
    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except CancelledError:
            self.cancelled = True
            return
        except StopIteration as exc:

            # Task resolves itself with coro's return
            # value.
            self.set_result(exc.value)
            return

        next_future.add_done_callback(self.step)

因此,当事件循环调用task.add_done_callback(stop_callback)时,它准备被任务停止。以下是run_until_complete

    # Method of event loop.
    def run_until_complete(self, coro):
        task = Task(coro)
        task.add_done_callback(stop_callback)
        try:
            self.run_forever()
        except StopError:
            pass

当任务捕获StopIteration并自行解析时,回调会在循环内引发StopError。循环停止,调用堆栈被展开到run_until_complete。我们的程序完成了。

结论

现代程序越来越频繁地受 I/O 限制,而不是受 CPU 限制。对于此类程序,Python 线程是两全其美:全局解释器锁阻止它们实际并行执行计算,而抢占式切换使它们容易出现竞争条件。Async 通常是正确的模式。但是,随着基于回调的异步代码的增长,它往往会变得杂乱无章。协程是一个整洁的替代方案。它们自然地分解成子例程,具有合理的异常处理和堆栈跟踪。

如果我们眯起眼睛,使yield from语句模糊,协程看起来像一个执行传统阻塞 I/O 的线程。我们甚至可以使用多线程编程中的经典模式来协调协程。无需重新发明轮子。因此,与回调相比,协程对有经验的多线程程序员来说是一个诱人的习惯用法。

但是,当我们睁开眼睛并专注于yield from语句时,我们会看到它们标记了协程让出控制权并允许其他协程运行的点。与线程不同,协程显示了我们的代码在哪里可以被中断以及在哪里不能被中断。在格里夫·莱夫科维茨(Glyph Lefkowitz)发人深省的论文“Unyielding”14中写道:“线程使局部推理变得困难,而局部推理可能是软件开发中最重要的方面。”然而,显式地让出控制权使得“通过检查例程本身而不是检查整个系统来理解例程的行为(以及正确性)”成为可能。

本章是在 Python 和异步的历史复兴期间编写的。基于生成器的协程(您刚刚学习了其设计)于 2014 年 3 月在 Python 3.4 中的“asyncio”模块中发布。2015 年 9 月,Python 3.5 发布,其中包含内置于语言本身的协程。这些原生协程使用新的语法“async def”声明,并且不再使用“yield from”,而是使用新的“await”关键字委托给协程或等待 Future。

尽管取得了这些进步,但核心思想仍然存在。Python 的新的原生协程在语法上将与生成器不同,但工作方式非常相似;事实上,它们将在 Python 解释器中共享一个实现。Task、Future 和事件循环将继续在 asyncio 中发挥其作用。

现在您已经了解了 asyncio 协程的工作原理,您就可以在很大程度上忘记这些细节了。该机制隐藏在一个漂亮的界面后面。但是您对基础知识的掌握使您能够在现代异步环境中正确有效地编写代码。

  1. Guido 在PyCon 2013上介绍了标准的 asyncio 库,当时称为“Tulip”。

  2. 即使是调用send也可能阻塞,如果接收者对未决消息的确认速度很慢,并且系统发送数据的缓冲区已满。

  3. http://www.kegel.com/c10k.html

  4. 无论如何,Python 的全局解释器锁都禁止在一个进程中并行运行 Python 代码。在 Python 中并行化 CPU 密集型算法需要多个进程,或者用 C 编写代码的并行部分。但那是以后再讨论的话题。

  5. Jesse 在"什么是 Async、它是如何工作的以及我何时应该使用它?"中列出了使用异步的迹象和禁忌症。Mike Bayer 在"异步 Python 和数据库"中比较了 asyncio 和多线程在不同工作负载下的吞吐量。

  6. 有关此问题的复杂解决方案,请参阅https://tornado.pythonlang.cn/en/stable/stack_context.html

  7. @asyncio.coroutine装饰器不是魔术。事实上,如果它装饰了一个生成器函数并且未设置PYTHONASYNCIODEBUG环境变量,则装饰器几乎什么也不做。它只是为框架的其他部分的方便设置了一个属性_is_coroutine。完全可以使用未用@asyncio.coroutine装饰的裸生成器来使用 asyncio。

  8. Python 3.5 的内置协程在PEP 492“使用 async 和 await 语法的协程”中进行了描述。

  9. 这个 future 存在许多缺陷。例如,一旦这个 future 被解析,产生它的协程应该立即恢复而不是暂停,但是用我们的代码它不会。请参阅 asyncio 的 Future 类以获取完整的实现。

  10. 事实上,这正是 CPython 中“yield from”的工作方式。函数在执行每个语句之前都会递增其指令指针。但是在外部生成器执行“yield from”之后,它会从其指令指针中减去 1 以使其自身固定在“yield from”语句处。然后它会将其结果传递给调用者。该循环会重复,直到内部生成器抛出StopIteration,此时外部生成器最终允许自身前进到下一条指令。

  11. https://docs.pythonlang.cn/3/library/queue.html

  12. https://docs.pythonlang.cn/3/library/asyncio-sync.html

  13. 实际的asyncio.Queue实现使用asyncio.Event代替此处显示的 Future。区别在于 Event 可以重置,而 Future 不能从已解析状态转换回挂起状态。

  14. https://glyph.twistedmatrix.com/2014/02/unyielding.html