forked from pyload/pyload
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathScheduler.py
More file actions
141 lines (110 loc) · 3.45 KB
/
Scheduler.py
File metadata and controls
141 lines (110 loc) · 3.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
"""
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
@author: mkaay
"""
from time import time
from heapq import heappop, heappush
from thread import start_new_thread
from threading import Lock
class AlreadyCalled(Exception):
pass
class Deferred():
def __init__(self):
self.call = []
self.result = ()
def addCallback(self, f, *cargs, **ckwargs):
self.call.append((f, cargs, ckwargs))
def callback(self, *args, **kwargs):
if self.result:
raise AlreadyCalled
self.result = (args, kwargs)
for f, cargs, ckwargs in self.call:
args += tuple(cargs)
kwargs.update(ckwargs)
f(*args ** kwargs)
class Scheduler():
def __init__(self, core):
self.core = core
self.queue = PriorityQueue()
def addJob(self, t, call, args=[], kwargs={}, threaded=True):
d = Deferred()
t += time()
j = Job(t, call, args, kwargs, d, threaded)
self.queue.put((t, j))
return d
def removeJob(self, d):
"""
:param d: defered object
:return: if job was deleted
"""
index = -1
for i, j in enumerate(self.queue):
if j[1].deferred == d:
index = i
if index >= 0:
del self.queue[index]
return True
return False
def work(self):
while True:
t, j = self.queue.get()
if not j:
break
else:
if t <= time():
j.start()
else:
self.queue.put((t, j))
break
class Job():
def __init__(self, time, call, args=[], kwargs={}, deferred=None, threaded=True):
self.time = float(time)
self.call = call
self.args = args
self.kwargs = kwargs
self.deferred = deferred
self.threaded = threaded
def run(self):
ret = self.call(*self.args, **self.kwargs)
if self.deferred is None:
return
else:
self.deferred.callback(ret)
def start(self):
if self.threaded:
start_new_thread(self.run, ())
else:
self.run()
class PriorityQueue():
""" a non blocking priority queue """
def __init__(self):
self.queue = []
self.lock = Lock()
def __iter__(self):
return iter(self.queue)
def __delitem__(self, key):
del self.queue[key]
def put(self, element):
self.lock.acquire()
heappush(self.queue, element)
self.lock.release()
def get(self):
""" return element or None """
self.lock.acquire()
try:
el = heappop(self.queue)
return el
except IndexError:
return None, None
finally:
self.lock.release()