-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Expand file tree
/
Copy pathscheduler.py
More file actions
143 lines (126 loc) · 4.89 KB
/
scheduler.py
File metadata and controls
143 lines (126 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import time
import multiprocessing
from proxypool.processors.server import app
from proxypool.processors.getter import Getter
from proxypool.processors.tester import Tester
from proxypool.setting import APP_PROD_METHOD_GEVENT, APP_PROD_METHOD_MEINHELD, APP_PROD_METHOD_TORNADO, CYCLE_GETTER, CYCLE_TESTER, API_HOST, \
API_THREADED, API_PORT, ENABLE_SERVER, IS_PROD, APP_PROD_METHOD, \
ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS
from loguru import logger
if IS_WINDOWS:
multiprocessing.freeze_support()
tester_process, getter_process, server_process = None, None, None
class Scheduler():
"""
scheduler
"""
def run_tester(self, cycle=CYCLE_TESTER):
"""
run tester
"""
if not ENABLE_TESTER:
logger.info('tester not enabled, exit')
return
tester = Tester()
loop = 0
while True:
logger.debug(f'tester loop {loop} start...')
tester.run()
loop += 1
time.sleep(cycle)
def run_getter(self, cycle=CYCLE_GETTER):
"""
run getter
"""
if not ENABLE_GETTER:
logger.info('getter not enabled, exit')
return
getter = Getter()
loop = 0
while True:
logger.debug(f'getter loop {loop} start...')
getter.run()
loop += 1
time.sleep(cycle)
def run_server(self):
"""
run server for api
"""
if not ENABLE_SERVER:
logger.info('server not enabled, exit')
return
if IS_PROD:
if APP_PROD_METHOD == APP_PROD_METHOD_GEVENT:
try:
from gevent.pywsgi import WSGIServer
except ImportError as e:
logger.exception(e)
else:
http_server = WSGIServer((API_HOST, API_PORT), app)
http_server.serve_forever()
elif APP_PROD_METHOD == APP_PROD_METHOD_TORNADO:
try:
from tornado.wsgi import WSGIContainer
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
except ImportError as e:
logger.exception(e)
else:
http_server = HTTPServer(WSGIContainer(app))
http_server.listen(API_PORT)
IOLoop.instance().start()
elif APP_PROD_METHOD == APP_PROD_METHOD_MEINHELD:
try:
import meinheld
except ImportError as e:
logger.exception(e)
else:
meinheld.listen((API_HOST, API_PORT))
meinheld.run(app)
else:
logger.error("unsupported APP_PROD_METHOD")
return
else:
app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED, use_reloader=False)
def run(self):
global tester_process, getter_process, server_process
try:
logger.info('starting proxypool...')
if ENABLE_TESTER:
tester_process = multiprocessing.Process(
target=self.run_tester)
logger.info(f'starting tester, pid {tester_process.pid}...')
tester_process.start()
if ENABLE_GETTER:
getter_process = multiprocessing.Process(
target=self.run_getter)
logger.info(f'starting getter, pid {getter_process.pid}...')
getter_process.start()
if ENABLE_SERVER:
server_process = multiprocessing.Process(
target=self.run_server)
logger.info(f'starting server, pid {server_process.pid}...')
server_process.start()
tester_process and tester_process.join()
getter_process and getter_process.join()
server_process and server_process.join()
except KeyboardInterrupt:
logger.info('received keyboard interrupt signal')
tester_process and tester_process.terminate()
getter_process and getter_process.terminate()
server_process and server_process.terminate()
finally:
# must call join method before calling is_alive
tester_process and tester_process.join()
getter_process and getter_process.join()
server_process and server_process.join()
logger.info(
f'tester is {"alive" if tester_process.is_alive() else "dead"}')
logger.info(
f'getter is {"alive" if getter_process.is_alive() else "dead"}')
logger.info(
f'server is {"alive" if server_process.is_alive() else "dead"}')
logger.info('proxy terminated')
if __name__ == '__main__':
scheduler = Scheduler()
scheduler.run()