This repository was archived by the owner on Oct 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathworker.py
More file actions
65 lines (51 loc) · 1.93 KB
/
worker.py
File metadata and controls
65 lines (51 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import multiprocessing
import os
import threading
import traceback
import time
from typing import Type
from job import Job, JobTaskNeedsBackoff
class Worker:
def __init__(self, job: Job):
self._job = job
self._stop_requested_event = multiprocessing.Event()
super().__init__()
def request_stop(self):
self._stop_requested_event.set()
def run(self):
try:
self._job.setup()
try:
while not self._stop_requested_event.is_set() and not self._job.is_failed():
try:
self._job.do_single_task()
except JobTaskNeedsBackoff as ex:
print(f"Job requested backoff: {ex}")
self.sleep_for_backoff(ex.seconds)
except Exception as ex:
if os.environ.get('WORKER_CONTINUE_ON_EXCEPTION'):
print(f"do_single_task raised an unhandled exception: {ex}")
traceback.print_exc()
print(f"Continuing because WORKER_CONTINUE_ON_EXCEPTION is set")
else:
raise
finally:
self._job.teardown()
except BaseException:
print("Exception made it to the outer exception check of run():")
traceback.print_exc()
def sleep_for_backoff(self, seconds: int):
death_time = time.time() + seconds
while time.time() < death_time:
# if a stop is requested, stop backing off, we're going to exit anyways
if self._stop_requested_event.is_set():
break
time.sleep(1)
class ThreadWorker(Worker, threading.Thread):
pass
class ProcessWorker(Worker, multiprocessing.Process):
pass
def start_job_worker(job: Job, worker_class: Type[Worker]) -> Worker:
w = worker_class(job)
w.start()
return w