Python asyncio模块

并发是指一次处理多件事。
并行是指一次做多件事。
二者不同,但是有联系。
一个关于结构,一个关于执行。
并发用于制定方案,用来解决可能(但未必)并行的问题。
——Rob Pike

asyncio 包使用事件循环驱动的协程实现并发,这是 Python 中最大也是最具雄心壮志的库之一。

协程相对于线程和进程的优点是任务调度完全由用户自己把控,少了操作系统调度带来的开销,通过使用 yield from foo,将当前协程暂停,控制权移交给事件循环手中,再去驱动其他协程。当 foo 协程运行完毕后,将结果返回给暂停的协程,将其恢复。通过这种方式,避开了由操作系统调度可能出现的问题,同时也能高效利用 CPU。

一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import asyncio
import itertools
import sys


@asyncio.coroutine
def spin(msg):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle('|/-\\|'):
status = char + ' ' + msg
write(status)
flush()
try:
yield from asyncio.sleep(.1)
except asyncio.CancelledError:
break
write('\r')
write('\r')


@asyncio.coroutine
def slow_function():
# 假装等待 I/O 一段时间
yield from asyncio.sleep(3)
return 42


@asyncio.coroutine
def supervisor():
spinner = asyncio.async(spin('thinking!'))
print('spinner object:', spinner)
result = yield from slow_function()
spinner.cancel()
return result


def main():
loop = asyncio.get_event_loop()
result = loop.run_until_complete(supervisor())
loop.close()
print('Answer:', result)


if __name__ == '__main__':
main()

这个协程例子会以动画形式显示文本式旋转指针。

asyncio.Future 介绍

asyncio.Future 类与 concurrent.futures.Future 类的接口基本一致,不过实现方式不同,不可以互换。

在 asyncio 包中 BaseEventLoop.create_task(...) 方法接收一个协程,排定它的运行时间,然后返回一个 asyncio.Task 实例——也是 asyncio.Future 类的实例,因为 Task 是 Future 的子类,用于包装协程。这与调用 Exector.submit(...) 方法创建 concurrent.futures.Future 实例是一个道理。

concurrent.futures.Future 类似,asyncio.Future 类也提供了 .done().add_done_callback(...).result() 等方法。前两个方法与 concurrent.futures 中介绍的一样,.result() 方法差别却很大。

asyncui.Future 类的 .result() 方法没有参数,因此不能指定超过时间。此外,如果调用 .result() 方法时 future 还没运行完毕,那么 .result() 方法不会阻塞去等待结果,而是抛出 asyncio.InvalidStateError 异常。

此外,获取 asyncio.Future 对象的结果通常使用 yield from,从中产出结果。

注意,使用 yield from 处理 future 与使用 add_done_callback 方法处理协程的作用一样:延迟的操作结束后,事件循环不会触发回调对象,而是设置 future 的返回值;而 yield from 表达式则在暂停的协程中生成返回值,恢复执行协程。

因为 asyncio.Future 类的目的时与 yield from 一起使用,所以通常不需要使用以下方法。

  • 无需调用 my_future.add_done_callback(...),因为可以直接把想在 future 运行后执行的操作放在协程中 yield from my_future 表达式的后面。
  • 无需调用 my_future.result(),因为 yield fromfuture 中产出的值就是结果。

包装协程

在 asyncio 包中,future 和协程关系紧密,因为可以使用 yield fromasyncio.Future 对象中产出结果。这意味着,如果 foo 是协程函数(调用后返回协程对象),抑或者是返回 FutureTask 实例的普通函数,那么可以这样写:res = yield from foo()。这是 asyncio 包的 API 中很多地方可以互换协程与 future 的原因之一。

为了执行这些操作,必须排定协程的运行时间,然后使用 asyncio.Task 对象包装协程。对协程来说,获取 Task 对象有两种主要方式。

  • asyncio.async(coro_or_future, *, loop=None) 这个函数统一了协程和 future:第一个参数可以是二者中的任何一个。如果是 Future 或者 Task 对象,那就原封不动地返回。如果是协程,那么 async 函数会调用 loop.create_task(...) 方法创建 Task 对象。loop= 关键字是可选的,用于传入事件循环;如果没有传入,那么 async 函数会通过调用 asyncio.get_event_loop() 函数来获取循环对象
  • BaseEventLoop.create_task(coro) 这个方法排定协程的执行时间,返回一个 asyncio.Task 对象。

asyncio 包中有多个函数会自动(内部使用 asyncio.async 函数)把参数指定的协程包装在 asyncio.Task 对象中,例如 BaseEventLoop.run_until_complete(...) 方法。

使用 asyncio 和 aiohttp 包下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import os
import sys
import time
import asyncio

import aiohttp

BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads'


def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)


def show(text):
print(text, end=' ')
sys.stdout.flush()


@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
image = yield from resp.read()
return image


@asyncio.coroutine
def download_one(cc):
image = yield from get_flag(cc)
show(cc)
save_flag(image, cc.lower() + '.gif')
return cc


def download_many(cc_list):
loop = asyncio.get_event_loop()
to_do = [download_one(cc) for cc in sorted(cc_list)]
wait_coro = asyncio.wait(to_do)
res, _ = loop.run_until_complete(wait_coro)
loop.close()
return len(res)


def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))


if __name__ == '__main__':
main(download_many)

以上的程序未经测试,仅作示例!

有几点要说的:

  • 标准库中的 requests 库是阻塞IO,不支持异步,这里我们要使用 aiohttp
  • 所有用到 yield from 句法的函数都必须是协程,需要用 `@asyncio.coroutine` 装饰

asyncio.wait 协程的参数是一个由 future 或协程构成的可迭代对象;wait 会分别把各个协程包装进一个 Task 对象。最终的结果是,wait 处理的所有对象都通过某种方式变成了 Future 类的实例。wait 是协程函数,因此返回的是一个协程或生成器对象;wait_coro 变量中存储的正是这种对象。为了驱动协程,我们把协程传给 loop.run_until_complete(...) 方法。

loop.run_until_complete 方法的参数是一个 future 或协程。如果是协程,run_until_complete 方法与 wait 函数一样,把协程包装进一个 Task 对象中。协程、future 和任务都能由 yield from 驱动,这正是 run_until_complete 方法对 wait 函数返回的 wait_coro 对象所做的事。wait_coro 运行结束后返回一个元组,第一个元素是一系列结束的 future,第二个元素是一系列未结束的 future。

使用 asyncio.as_completed 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import asyncio
import collections

import aiohttp
from aiohttp import web
import tqdm

DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
Result = collections.namedtuple('Result', 'status cc')


class FetchError(Exception):
def __init__(self, country_code):
self.country_code = country_code


@asyncio.coroutine
def get_flag(base_url, cc):
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
if resp.status == 200:
image = yield from resp.read()
return image
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.WebSocketError(code=resp.status, message=resp.reason)


@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
try:
with(yield from semaphore):
image = yield from get_flag(base_url, cc)
except web.HTTPNotFound:
status = 'not found'
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc
else:
save_flag(image, cc.lower() + '.gif')
status = 'ok'
msg = 'ok'
if verbose and msg:
print(cc, msg)
return Result(status, cc)


@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req):
counter = collections.Counter()
semaphore = asyncio.Semaphore(concur_req)
to_do = [download_one(cc, base_url, semaphore, verbose) for cc in sorted(cc_list)]
to_do_iter = asyncio.as_completed(to_do)
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
for future in to_do_iter:
try:
res = yield from future
except FetchError as exc:
country_code = exc.country_code
try:
error_msg = exc.__cause__.args[0]
except IndexError:
error_msg = exc.__cause__.__class__.__name__
if verbose and error_msg:
msg = '*** Error for {}: {}'
print(msg.format(country_code, error_msg))
status = 'error'
else:
status = res.status
counter[status] += 1
return counter


def download_many(cc_list, base_url, verbose, concur_req):
loop = asyncio.get_event_loop()
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
counts = loop.run_until_complete(coro)
loop.close()
return counts


def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, 'wb') as fp:
fp.write(img)


def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = '\n{} flags downloaded in {:.2f}s'
print(msg.format(count, elapsed))


if __name__ == '__main__':
main(download_many)

这个例程是下载某个网站上的图标的异步版本。

通过使用 asyncio.as_completed 函数来分别处理协程,使进度条库 tqdm 被成功集成到程序中。关于上面的这个例程有几点可以谈的。

  • FetchError 类是我们手动包装的一个异常,并且获取了产生异常的协程的 country code
  • get_flag 协程使用 aiohttp 包来异步获取图标,并且对 http 错误进行了处理
  • download_one 协程通过使用 asyncio.Semaphore 类对象来限制并发请求数量。在 yield from 表达式中将 semaphore 当成上下文管理器来使用,防止阻塞整个系统:如果 semaphore 计数器的值是所允许的最大值,只有这个协程会阻塞。退出 with 语句后,semaphore 计数器的值会递减。
  • 因为使用了 as_completed 函数,我们不能像之前一样利用一个字典来获取错误国家的信息。本例中,我们自定义了一个错误,用来在发生错误时包装国家信息并返回。在 as_completed 的调用函数中捕获错误,并提取出获取图标错误的国家的信息。

对于我们使用的异步网络客户端来说,一定要使用某种限流机制,防止向服务器发起太多并发请求,因为如果服务器过载,那么系统的整体性能可能会下降。

在本脚本我们的做法是,在 downloader_coro 函数中创建一个 asyncio.Semaphore 实例,然后把它传递给 download_one 函数的 semaphore 参数。

Semaphore 对象维护着一个内部计数器,若在对象上调用 .acquire() 协程方法,计数器递减;调用 .release() 协程方法,计数器递增。计数器的初始值在实例化 Semaphore 时指定,如下所示:

1
semaphore = asyncio.Semaphore(concur_req)

如果计数器大于零,那么调用 .acquire() 方法不会阻塞;如果计数器为零, .acquire() 方法会阻塞调用这个方法的协程,直到其他协程在同一个 Semaphore 对象上调用 .release() 方法,让计数器递增。

本脚本中没有使用 .acquire().release() 方法,而是通过 with 语句将 semaphore 当作上下文管理器使用:

1
2
with (yield from semaphore):
image = yield from get_flag(base_url, cc)

这段代码保证,任何时候都不会有超过 concur_reqget_flag 协程启动。

使用 Executor 对象,防止阻塞事件循环

文件 I/O 会产生阻塞,在上面的示例中,关于 save_flag 函数的保存到文件操作我们一直没有关注。事实上,这个函数很可能阻塞。

在线程版本中,我们不需要关心太多文件阻塞,因为阻塞的只是众多工作线程的一个。阻塞型 I/O 调用在背后会释放 GIL,因此另一个线程可以继续。但是在异步版本中,只有一个线程工作,函数调用由事件循环驱动,save_flag 函数会阻塞住唯一线程。因此保存文件时,整个应用程序都会冻结。这个问题的解决办法是,使用事件循环对象的 run_in_executor 方法。

asyncio 的事件循环在背后维护着一个 ThreadPoolExecutor 对象,我们可以调用 run_in_executor 方法,把可调用的对象发给它执行。若想在示例中使用这个功能,download_one 协程只有几行代码需要改动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
try:
with(yield from semaphore):
image = yield from get_flag(base_url, cc)
except web.HTTPNotFound:
status = 'not found'
msg = 'not found'
except Exception as exc:
raise FetchError(cc) from exc
else:
# 这部分进行了改动
loop = asyncio.get_event_loop()
loop.run_in_executor(None,
save_flag, image, cc.lower() + '.gif')
# 下面是原来的代码
# save_flag(image, cc.lower() + '.gif')
status = 'ok'
msg = 'ok'
if verbose and msg:
print(cc, msg)
return Result(status, cc)
  1. 获取事件循环对象的引用
  2. 调用 run_in_executor 方法,第一个参数是 Executor 实例;如果设为 None,使用事件循环的默认 ThreadPoolExecutor 实例。余下的参数是可调用对象及其位置参数

async 与 await 关键字

为了简化和更好的标识异步IO,从 Python3.5 起引入了两个关键字 asyncawait,可以让 coroutine 的代码更易读。

请注意,asyncawait 是针对 coroutine 的新语法,要使用仅需做两步简单的替换:

  1. `@asyncio.coroutine替换成async`
  2. yield from 替换成 await

下面是一个简单的示例:

1
2
3
4
5
6
7
import asyncio

@asyncio.coroutine
def hello()
print("Hello World!")
yield from asyncio.sleep(1)
print("Hello again")

替换成

1
2
3
4
5
6
import asyncio

async def hello()
print("Hello World")
await asyncio.sleep(1)
print("Hello again")

就可以了,代码的清晰度也大大提升了。

关于 asyncio 的介绍就到这里了,文章参考了 《流畅的Python》 很多地方。在我的心里,这本书就相当于 Java 中的 《Thinking in Java》,十分适合中级 Python 程序员阅读。不过,如果你没有在 Python 方面深造的计划的话,就不用看了。等看完这本书两遍后,我会专门写一篇博客来介绍这一本书。

-------------本文结束感谢阅读-------------
  • 本文标题:Python asyncio模块
  • 本文作者:xlui
  • 发布时间:2017年10月16日 - 22:10
  • 最后更新:2023年01月20日 - 01:01
  • 本文链接: https://xlui.me/t/python-asyncio/
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-ND 4.0 许可协议。转载请注明出处!