并发是指一次处理多件事。
并行是指一次做多件事。
二者不同,但是有联系。
一个关于结构,一个关于执行。
并发用于制定方案,用来解决可能(但未必)并行的问题。
——Rob Pike
asyncio
包使用事件循环驱动的协程实现并发,这是 Python 中最大也是最具雄心壮志的库之一。
协程相对于线程和进程的优点是任务调度完全由用户自己把控,少了操作系统调度带来的开销,通过使用 yield from foo
,将当前协程暂停,控制权移交给事件循环手中,再去驱动其他协程。当 foo
协程运行完毕后,将结果返回给暂停的协程,将其恢复。通过这种方式,避开了由操作系统调度可能出现的问题,同时也能高效利用 CPU。
一个简单的例子
1 | import asyncio |
这个协程例子会以动画形式显示文本式旋转指针。
asyncio.Future 介绍
asyncio.Future
类与 concurrent.futures.Future
类的接口基本一致,不过实现方式不同,不可以互换。
在 asyncio 包中 BaseEventLoop.create_task(...)
方法接收一个协程,排定它的运行时间,然后返回一个 asyncio.Task
实例——也是 asyncio.Future
类的实例,因为 Task 是 Future 的子类,用于包装协程。这与调用 Exector.submit(...)
方法创建 concurrent.futures.Future
实例是一个道理。
与 concurrent.futures.Future
类似,asyncio.Future
类也提供了 .done()
、.add_done_callback(...)
和 .result()
等方法。前两个方法与 concurrent.futures
中介绍的一样,.result()
方法差别却很大。
asyncui.Future
类的 .result()
方法没有参数,因此不能指定超过时间。此外,如果调用 .result()
方法时 future
还没运行完毕,那么 .result()
方法不会阻塞去等待结果,而是抛出 asyncio.InvalidStateError
异常。
此外,获取 asyncio.Future
对象的结果通常使用 yield from
,从中产出结果。
注意,使用 yield from
处理 future
与使用 add_done_callback
方法处理协程的作用一样:延迟的操作结束后,事件循环不会触发回调对象,而是设置 future
的返回值;而 yield from
表达式则在暂停的协程中生成返回值,恢复执行协程。
因为 asyncio.Future
类的目的时与 yield from
一起使用,所以通常不需要使用以下方法。
- 无需调用
my_future.add_done_callback(...)
,因为可以直接把想在future
运行后执行的操作放在协程中yield from my_future
表达式的后面。 - 无需调用
my_future.result()
,因为yield from
从future
中产出的值就是结果。
包装协程
在 asyncio 包中,future 和协程关系紧密,因为可以使用 yield from
从 asyncio.Future
对象中产出结果。这意味着,如果 foo
是协程函数(调用后返回协程对象),抑或者是返回 Future
或 Task
实例的普通函数,那么可以这样写:res = yield from foo()
。这是 asyncio
包的 API 中很多地方可以互换协程与 future 的原因之一。
为了执行这些操作,必须排定协程的运行时间,然后使用 asyncio.Task
对象包装协程。对协程来说,获取 Task 对象有两种主要方式。
asyncio.async(coro_or_future, *, loop=None)
这个函数统一了协程和 future:第一个参数可以是二者中的任何一个。如果是 Future 或者 Task 对象,那就原封不动地返回。如果是协程,那么async
函数会调用loop.create_task(...)
方法创建 Task 对象。loop=
关键字是可选的,用于传入事件循环;如果没有传入,那么 async 函数会通过调用asyncio.get_event_loop()
函数来获取循环对象BaseEventLoop.create_task(coro)
这个方法排定协程的执行时间,返回一个asyncio.Task
对象。
asyncio 包中有多个函数会自动(内部使用 asyncio.async
函数)把参数指定的协程包装在 asyncio.Task
对象中,例如 BaseEventLoop.run_until_complete(...)
方法。
使用 asyncio 和 aiohttp 包下载
1 | import os |
以上的程序未经测试,仅作示例!
有几点要说的:
- 标准库中的
requests
库是阻塞IO,不支持异步,这里我们要使用aiohttp
库 - 所有用到
yield from
句法的函数都必须是协程,需要用 `@asyncio.coroutine` 装饰
asyncio.wait
协程的参数是一个由 future
或协程构成的可迭代对象;wait
会分别把各个协程包装进一个 Task
对象。最终的结果是,wait
处理的所有对象都通过某种方式变成了 Future
类的实例。wait
是协程函数,因此返回的是一个协程或生成器对象;wait_coro
变量中存储的正是这种对象。为了驱动协程,我们把协程传给 loop.run_until_complete(...)
方法。
loop.run_until_complete
方法的参数是一个 future 或协程。如果是协程,run_until_complete
方法与 wait 函数一样,把协程包装进一个 Task
对象中。协程、future 和任务都能由 yield from
驱动,这正是 run_until_complete
方法对 wait 函数返回的 wait_coro
对象所做的事。wait_coro
运行结束后返回一个元组,第一个元素是一系列结束的 future,第二个元素是一系列未结束的 future。
使用 asyncio.as_completed 函数
1 | import asyncio |
这个例程是下载某个网站上的图标的异步版本。
通过使用 asyncio.as_completed
函数来分别处理协程,使进度条库 tqdm
被成功集成到程序中。关于上面的这个例程有几点可以谈的。
FetchError
类是我们手动包装的一个异常,并且获取了产生异常的协程的 country codeget_flag
协程使用aiohttp
包来异步获取图标,并且对 http 错误进行了处理download_one
协程通过使用asyncio.Semaphore
类对象来限制并发请求数量。在yield from
表达式中将semaphore
当成上下文管理器来使用,防止阻塞整个系统:如果 semaphore 计数器的值是所允许的最大值,只有这个协程会阻塞。退出with
语句后,semaphore 计数器的值会递减。- 因为使用了
as_completed
函数,我们不能像之前一样利用一个字典来获取错误国家的信息。本例中,我们自定义了一个错误,用来在发生错误时包装国家信息并返回。在as_completed
的调用函数中捕获错误,并提取出获取图标错误的国家的信息。
对于我们使用的异步网络客户端来说,一定要使用某种限流机制,防止向服务器发起太多并发请求,因为如果服务器过载,那么系统的整体性能可能会下降。
在本脚本我们的做法是,在 downloader_coro
函数中创建一个 asyncio.Semaphore
实例,然后把它传递给 download_one
函数的 semaphore
参数。
Semaphore 对象维护着一个内部计数器,若在对象上调用 .acquire()
协程方法,计数器递减;调用 .release()
协程方法,计数器递增。计数器的初始值在实例化 Semaphore 时指定,如下所示:
1 | semaphore = asyncio.Semaphore(concur_req) |
如果计数器大于零,那么调用 .acquire()
方法不会阻塞;如果计数器为零, .acquire()
方法会阻塞调用这个方法的协程,直到其他协程在同一个 Semaphore 对象上调用 .release()
方法,让计数器递增。
本脚本中没有使用 .acquire()
或 .release()
方法,而是通过 with
语句将 semaphore 当作上下文管理器使用:
1 | with (yield from semaphore): |
这段代码保证,任何时候都不会有超过 concur_req
个 get_flag
协程启动。
使用 Executor 对象,防止阻塞事件循环
文件 I/O 会产生阻塞,在上面的示例中,关于 save_flag
函数的保存到文件操作我们一直没有关注。事实上,这个函数很可能阻塞。
在线程版本中,我们不需要关心太多文件阻塞,因为阻塞的只是众多工作线程的一个。阻塞型 I/O 调用在背后会释放 GIL,因此另一个线程可以继续。但是在异步版本中,只有一个线程工作,函数调用由事件循环驱动,save_flag
函数会阻塞住唯一线程。因此保存文件时,整个应用程序都会冻结。这个问题的解决办法是,使用事件循环对象的 run_in_executor
方法。
asyncio
的事件循环在背后维护着一个 ThreadPoolExecutor
对象,我们可以调用 run_in_executor
方法,把可调用的对象发给它执行。若想在示例中使用这个功能,download_one
协程只有几行代码需要改动:
1 |
|
- 获取事件循环对象的引用
- 调用
run_in_executor
方法,第一个参数是Executor
实例;如果设为None
,使用事件循环的默认ThreadPoolExecutor
实例。余下的参数是可调用对象及其位置参数
async 与 await 关键字
为了简化和更好的标识异步IO,从 Python3.5 起引入了两个关键字 async
和 await
,可以让 coroutine 的代码更易读。
请注意,async
和 await
是针对 coroutine 的新语法,要使用仅需做两步简单的替换:
- 把 `@asyncio.coroutine
替换成
async` - 把
yield from
替换成await
下面是一个简单的示例:
1 | import asyncio |
替换成
1 | import asyncio |
就可以了,代码的清晰度也大大提升了。
关于 asyncio 的介绍就到这里了,文章参考了 《流畅的Python》 很多地方。在我的心里,这本书就相当于 Java 中的 《Thinking in Java》,十分适合中级 Python 程序员阅读。不过,如果你没有在 Python 方面深造的计划的话,就不用看了。等看完这本书两遍后,我会专门写一篇博客来介绍这一本书。