-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcoroutine.py
More file actions
106 lines (80 loc) · 2.36 KB
/
coroutine.py
File metadata and controls
106 lines (80 loc) · 2.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
###
#2# 我们来写一个简单的 Coroutine 框架。
#3# 虽然Python有Coroutine库。
#3# 但是,编程嘛,最主要的是开心!
###
import socket # on top of TCP
import time
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
# select: System Call -----> watch the readiness of a unix-file(socket) i/o
# only socket is possible in Windows
# non-blocking socket
selector = DefaultSelector()
class Future: # ~=Promise, return the caller scope a promise
# about something in the future
def __init__(self):
self.callbacks = []
def resolve(self): # on future event callback
for func in self.callbacks:
func()
class Task: # responsible for calling next() on generators
# in charge of the async functions
def __init__(self, gen, eventLoop):
self.gen = gen
self.step()
def step(self): # go to next step/next yield
try:
f = next(self.gen)
f.callbacks.append(self.step)
except StopIteration as e:
# task is finished
eventLoop.n_task -= 1
print('--------------------------------------', 'Byte Received:', e, '\n\n')
class EventLoop:
def __init__(self):
self.n_task = 0
def add_task(self, generator):
self.n_task += 1
Task(generator, self)
def start(self):
while self.n_task:
events = selector.select()
for event, mask in events:
f = event.data
f.resolve()
def async_get(path):
s = socket.socket()
s.setblocking(False)
try:
s.connect(('localhost', 3000))
except BlockingIOError as e:
print(e)
request = 'GET %s HTTP/1.0\r\n\r\n' % path
f = Future()
selector.register(s.fileno(), EVENT_WRITE, data=f)
yield f
# the socket is writable
selector.unregister(s.fileno())
s.send(request.encode())
totalReceived = []
while True:
f = Future()
selector.register(s.fileno(), EVENT_READ, data=f)
yield f
# socket is readable
selector.unregister(s.fileno())
received = s.recv(1000)
if received:
totalReceived.append(received)
else:
body = (b''.join(totalReceived)).decode()
print('--------------------------------------')
print(body)
return len(body)
if __name__ == '__main__':
start = time.time()
eventLoop = EventLoop()
for i in range(20):
eventLoop.add_task(async_get('/super-slow'))
eventLoop.start()
print('%.1f sec' % (time.time() - start))