-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathflags2_asyncio.py
More file actions
121 lines (105 loc) · 4.82 KB
/
flags2_asyncio.py
File metadata and controls
121 lines (105 loc) · 4.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import asyncio
import aiohttp
from aiohttp import web
import collections
import tqdm
from flag2_common import main, HTTPStatus, Result, save_flag
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
class FetchError(Exception):
def __init__(self, contry_code):
self.contry_code = contry_code
async def get_flag(session, base_url, cc):
"""最底层的协程函数, 异步下载一个文件.
如果GET操作返回的状态是 200, 返回异步非阻塞式读取的结果;
如果状态是404, 抛出 aiohttp.web.HTTPNotFound 异常;
对于其他异常状态, 返回相应的 aiohttp.HttpProcessingError.
"""
url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
async with session.get(url) as resp:
if resp.status == 200:
return await resp.read()
elif resp.status == 404:
raise web.HTTPNotFound()
else:
raise aiohttp.errors.HttpProcessingError(
code=resp.status,
message=resp.version,
headers=resp.headers)
async def download_one(session, cc, base_url, semaphore, verbose):
"""委托生成器函数, 驱动 get_flag 协程函数
"""
try:
# semaphore 是一个 asyncio.Semaphore 的实例, 用来控制最大并发数
# 因为它是 asyncio 的对象, 需要在前面 加 async
# 等价于 with (yield from semaphore)
async with semaphore:
# 等价于 image = yield from get_flag(...)
image = await get_flag(session, base_url, cc)
# 退出上面的 with 语句时信号量减1, 使得某一个挂起的生成器函数重新运行
# 下面的部分处理了 get_flag 抛出的异常:
# 1.对于 aiohttp.web.HTTPNotFound, 吞掉它.
# 2.其他异常用 FetchError 异常类封装后向上抛出
except web.HTTPNotFound:
status = HTTPStatus.not_found
msg = 'not found'
except Exception as exc:
# 注意这里的 raise X from Y 语法
raise FetchError(cc) from exc
else:
# !! 注意到 文件写入操作也是IO操作. 目前没有异步非阻塞写入文件的库函数... 因此他会阻塞事件循环
# 解决方案: asyncio.run_in_executor(...), 这是事件循环中的线程池
"""
loop = asyncio.get_event_loop()
# run_in_executor(...) 的第一个参数接受一个executor的实例, 如果为None则会用默认的线程池
loop.run_in_executor(None, save_flag, image, cc.lower() + '.gif')
"""
save_flag(image, cc.lower() + '.gif')
status = HTTPStatus.ok
msg = 'OK'
if verbose and msg:
print(cc, msg)
return Result(status, cc)
async def downloader_coro(cc_list, base_url, verbose, concur_req):
counter = collections.Counter()
# 用信号量控制最大并发数
semaphore = asyncio.Semaphore(concur_req)
async with aiohttp.ClientSession() as session:
# 创建 to_do list, list 中的每一个元素是一个生成器函数
to_do = [download_one(session, cc, base_url, semaphore, verbose)\
for cc in sorted(cc_list)]
# 获得一个生成器函数, 每当有一个协程完成就返回一个future对象
to_do_iter = asyncio.as_completed(to_do)
if not verbose:
to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
# 在 to_do_iter 已经完成的部分(它们都返回一个 future 对象)上迭代
for future in to_do_iter:
try:
# 等价于 res = yield from future
# 注意, 不要调用 future.result()
res = await future
# download_one 函数抛出的所有异常都是 FetchError 的实例
except FetchError as exc:
country_code = exc.contry_code
try:
# 试图从原始异常中获取错误信息(注意我们用了 yield X from Y 语法, 这里获取 Y 的错误信息)
error_msg = exc.__cause__.args[0]
except IndexError:
# 如果我们没能获得原是错误信息, 就用原始原始异常的名字作为错误信息
error_msg = exc.__caluse__.__class__.__name__
if verbose and error_msg:
msg = '*** Error for {}: {}'
print(msg.format(country_code, error_msg))
status = HTTPStatus.error
else:
status = res.status
counter[status] += 1
return counter
def download_many(cc_list, base_url, verbose, concur_req):
loop = asyncio.get_event_loop()
coro = downloader_coro(cc_list, base_url, verbose, concur_req)
counts = loop.run_until_complete(coro)
loop.close()
return counts
if __name__ == '__main__':
main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)