Skip to content

Commit e0803aa

Browse files
committed
Merge pull request ipython#1930 from minrk/limitdictdb
add size-limiting to the DictDB backend Adds configurables for culling old records when limits are exceeded. Default limits: 1GB of buffers, 1024 records, 10% cull.
2 parents b6278b2 + 7a85cbc commit e0803aa

File tree

3 files changed

+166
-13
lines changed

3 files changed

+166
-13
lines changed

IPython/parallel/controller/dictdb.py

Lines changed: 93 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151

5252
from IPython.config.configurable import LoggingConfigurable
5353

54-
from IPython.utils.traitlets import Dict, Unicode, Instance
54+
from IPython.utils.traitlets import Dict, Unicode, Integer, Float
5555

5656
filters = {
5757
'$lt' : lambda a,b: a < b,
@@ -100,6 +100,33 @@ class DictDB(BaseDB):
100100
"""
101101

102102
_records = Dict()
103+
_culled_ids = set() # set of ids which have been culled
104+
_buffer_bytes = Integer(0) # running total of the bytes in the DB
105+
106+
size_limit = Integer(1024*1024, config=True,
107+
help="""The maximum total size (in bytes) of the buffers stored in the db
108+
109+
When the db exceeds this size, the oldest records will be culled until
110+
the total size is under size_limit * (1-cull_fraction).
111+
"""
112+
)
113+
record_limit = Integer(1024, config=True,
114+
help="""The maximum number of records in the db
115+
116+
When the history exceeds this size, the first record_limit * cull_fraction
117+
records will be culled.
118+
"""
119+
)
120+
cull_fraction = Float(0.1, config=True,
121+
help="""The fraction by which the db should culled when one of the limits is exceeded
122+
123+
In general, the db size will spend most of its time with a size in the range:
124+
125+
[limit * (1-cull_fraction), limit]
126+
127+
for each of size_limit and record_limit.
128+
"""
129+
)
103130

104131
def _match_one(self, rec, tests):
105132
"""Check if a specific record matches tests."""
@@ -130,34 +157,92 @@ def _extract_subdict(self, rec, keys):
130157
for key in keys:
131158
d[key] = rec[key]
132159
return copy(d)
160+
161+
# methods for monitoring size / culling history
162+
163+
def _add_bytes(self, rec):
164+
for key in ('buffers', 'result_buffers'):
165+
for buf in rec.get(key) or []:
166+
self._buffer_bytes += len(buf)
167+
168+
self._maybe_cull()
169+
170+
def _drop_bytes(self, rec):
171+
for key in ('buffers', 'result_buffers'):
172+
for buf in rec.get(key) or []:
173+
self._buffer_bytes -= len(buf)
174+
175+
def _cull_oldest(self, n=1):
176+
"""cull the oldest N records"""
177+
for msg_id in self.get_history()[:n]:
178+
self.log.debug("Culling record: %r", msg_id)
179+
self._culled_ids.add(msg_id)
180+
self.drop_record(msg_id)
181+
182+
def _maybe_cull(self):
183+
# cull by count:
184+
if len(self._records) > self.record_limit:
185+
to_cull = int(self.cull_fraction * self.record_limit)
186+
self.log.info("%i records exceeds limit of %i, culling oldest %i",
187+
len(self._records), self.record_limit, to_cull
188+
)
189+
self._cull_oldest(to_cull)
190+
191+
# cull by size:
192+
if self._buffer_bytes > self.size_limit:
193+
limit = self.size_limit * (1 - self.cull_fraction)
194+
195+
before = self._buffer_bytes
196+
before_count = len(self._records)
197+
culled = 0
198+
while self._buffer_bytes > limit:
199+
self._cull_oldest(1)
200+
culled += 1
201+
202+
self.log.info("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records.",
203+
before_count, before, self.size_limit, culled
204+
)
205+
206+
# public API methods:
133207

134208
def add_record(self, msg_id, rec):
135209
"""Add a new Task Record, by msg_id."""
136210
if msg_id in self._records:
137211
raise KeyError("Already have msg_id %r"%(msg_id))
138212
self._records[msg_id] = rec
213+
self._add_bytes(rec)
214+
self._maybe_cull()
139215

140216
def get_record(self, msg_id):
141217
"""Get a specific Task Record, by msg_id."""
218+
if msg_id in self._culled_ids:
219+
raise KeyError("Record %r has been culled for size" % msg_id)
142220
if not msg_id in self._records:
143221
raise KeyError("No such msg_id %r"%(msg_id))
144222
return copy(self._records[msg_id])
145223

146224
def update_record(self, msg_id, rec):
147225
"""Update the data in an existing record."""
148-
self._records[msg_id].update(rec)
226+
if msg_id in self._culled_ids:
227+
raise KeyError("Record %r has been culled for size" % msg_id)
228+
_rec = self._records[msg_id]
229+
self._drop_bytes(_rec)
230+
_rec.update(rec)
231+
self._add_bytes(_rec)
149232

150233
def drop_matching_records(self, check):
151234
"""Remove a record from the DB."""
152235
matches = self._match(check)
153-
for m in matches:
154-
del self._records[m['msg_id']]
236+
for rec in matches:
237+
self._drop_bytes(rec)
238+
del self._records[rec['msg_id']]
155239

156240
def drop_record(self, msg_id):
157241
"""Remove a record from the DB."""
242+
rec = self._records[msg_id]
243+
self._drop_bytes(rec)
158244
del self._records[msg_id]
159245

160-
161246
def find_records(self, check, keys=None):
162247
"""Find records matching a query dict, optionally extracting subset of keys.
163248
@@ -178,17 +263,18 @@ def find_records(self, check, keys=None):
178263
else:
179264
return matches
180265

181-
182266
def get_history(self):
183267
"""get all msg_ids, ordered by time submitted."""
184268
msg_ids = self._records.keys()
185269
return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
186270

271+
187272
NODATA = KeyError("NoDB backend doesn't store any data. "
188273
"Start the Controller with a DB backend to enable resubmission / result persistence."
189274
)
190275

191-
class NoDB(DictDB):
276+
277+
class NoDB(BaseDB):
192278
"""A blackhole db backend that actually stores no information.
193279
194280
Provides the full DB interface, but raises KeyErrors on any

IPython/parallel/tests/test_db.py

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,23 +45,23 @@ def setup():
4545
temp_db = tempfile.NamedTemporaryFile(suffix='.db').name
4646

4747

48-
class TestDictBackend(TestCase):
48+
class TaskDBTest:
4949
def setUp(self):
5050
self.session = Session()
5151
self.db = self.create_db()
5252
self.load_records(16)
5353

5454
def create_db(self):
55-
return DictDB()
55+
raise NotImplementedError
5656

57-
def load_records(self, n=1):
57+
def load_records(self, n=1, buffer_size=100):
5858
"""load n records for testing"""
5959
#sleep 1/10 s, to ensure timestamp is different to previous calls
6060
time.sleep(0.1)
6161
msg_ids = []
6262
for i in range(n):
6363
msg = self.session.msg('apply_request', content=dict(a=5))
64-
msg['buffers'] = []
64+
msg['buffers'] = [os.urandom(buffer_size)]
6565
rec = init_record(msg)
6666
msg_id = msg['header']['msg_id']
6767
msg_ids.append(msg_id)
@@ -228,7 +228,72 @@ def test_pop_safe_find_keys(self):
228228
self.assertEqual(rec2['header']['msg_id'], msg_id)
229229

230230

231-
class TestSQLiteBackend(TestDictBackend):
231+
class TestDictBackend(TaskDBTest, TestCase):
232+
233+
def create_db(self):
234+
return DictDB()
235+
236+
def test_cull_count(self):
237+
self.db = self.create_db() # skip the load-records init from setUp
238+
self.db.record_limit = 20
239+
self.db.cull_fraction = 0.2
240+
self.load_records(20)
241+
self.assertEquals(len(self.db.get_history()), 20)
242+
self.load_records(1)
243+
# 0.2 * 20 = 4, 21 - 4 = 17
244+
self.assertEquals(len(self.db.get_history()), 17)
245+
self.load_records(3)
246+
self.assertEquals(len(self.db.get_history()), 20)
247+
self.load_records(1)
248+
self.assertEquals(len(self.db.get_history()), 17)
249+
250+
for i in range(100):
251+
self.load_records(1)
252+
self.assertTrue(len(self.db.get_history()) >= 17)
253+
self.assertTrue(len(self.db.get_history()) <= 20)
254+
255+
def test_cull_size(self):
256+
self.db = self.create_db() # skip the load-records init from setUp
257+
self.db.size_limit = 1000
258+
self.db.cull_fraction = 0.2
259+
self.load_records(100, buffer_size=10)
260+
self.assertEquals(len(self.db.get_history()), 100)
261+
self.load_records(1, buffer_size=0)
262+
self.assertEquals(len(self.db.get_history()), 101)
263+
self.load_records(1, buffer_size=1)
264+
# 0.2 * 100 = 20, 101 - 20 = 81
265+
self.assertEquals(len(self.db.get_history()), 81)
266+
267+
def test_cull_size_drop(self):
268+
"""dropping records updates tracked buffer size"""
269+
self.db = self.create_db() # skip the load-records init from setUp
270+
self.db.size_limit = 1000
271+
self.db.cull_fraction = 0.2
272+
self.load_records(100, buffer_size=10)
273+
self.assertEquals(len(self.db.get_history()), 100)
274+
self.db.drop_record(self.db.get_history()[-1])
275+
self.assertEquals(len(self.db.get_history()), 99)
276+
self.load_records(1, buffer_size=5)
277+
self.assertEquals(len(self.db.get_history()), 100)
278+
self.load_records(1, buffer_size=5)
279+
self.assertEquals(len(self.db.get_history()), 101)
280+
self.load_records(1, buffer_size=1)
281+
self.assertEquals(len(self.db.get_history()), 81)
282+
283+
def test_cull_size_update(self):
284+
"""updating records updates tracked buffer size"""
285+
self.db = self.create_db() # skip the load-records init from setUp
286+
self.db.size_limit = 1000
287+
self.db.cull_fraction = 0.2
288+
self.load_records(100, buffer_size=10)
289+
self.assertEquals(len(self.db.get_history()), 100)
290+
msg_id = self.db.get_history()[-1]
291+
self.db.update_record(msg_id, dict(result_buffers = [os.urandom(10)], buffers=[]))
292+
self.assertEquals(len(self.db.get_history()), 100)
293+
self.db.update_record(msg_id, dict(result_buffers = [os.urandom(11)], buffers=[]))
294+
self.assertEquals(len(self.db.get_history()), 79)
295+
296+
class TestSQLiteBackend(TaskDBTest, TestCase):
232297

233298
@dec.skip_without('sqlite3')
234299
def create_db(self):

IPython/parallel/tests/test_mongodb.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
# Imports
1717
#-------------------------------------------------------------------------------
1818

19+
from unittest import TestCase
20+
1921
from nose import SkipTest
2022

2123
from pymongo import Connection
@@ -28,7 +30,7 @@
2830
except Exception:
2931
c=None
3032

31-
class TestMongoBackend(test_db.TestDictBackend):
33+
class TestMongoBackend(test_db.TaskDBTest, TestCase):
3234
"""MongoDB backend tests"""
3335

3436
def create_db(self):

0 commit comments

Comments
 (0)