5151
5252from IPython .config .configurable import LoggingConfigurable
5353
54- from IPython .utils .traitlets import Dict , Unicode , Instance
54+ from IPython .utils .traitlets import Dict , Unicode , Integer , Float
5555
5656filters = {
5757 '$lt' : lambda a ,b : a < b ,
@@ -100,6 +100,33 @@ class DictDB(BaseDB):
100100 """
101101
102102 _records = Dict ()
103+ _culled_ids = set () # set of ids which have been culled
104+ _buffer_bytes = Integer (0 ) # running total of the bytes in the DB
105+
106+ size_limit = Integer (1024 * 1024 , config = True ,
107+ help = """The maximum total size (in bytes) of the buffers stored in the db
108+
109+ When the db exceeds this size, the oldest records will be culled until
110+ the total size is under size_limit * (1-cull_fraction).
111+ """
112+ )
113+ record_limit = Integer (1024 , config = True ,
114+ help = """The maximum number of records in the db
115+
116+ When the history exceeds this size, the first record_limit * cull_fraction
117+ records will be culled.
118+ """
119+ )
120+ cull_fraction = Float (0.1 , config = True ,
121+ help = """The fraction by which the db should culled when one of the limits is exceeded
122+
123+ In general, the db size will spend most of its time with a size in the range:
124+
125+ [limit * (1-cull_fraction), limit]
126+
127+ for each of size_limit and record_limit.
128+ """
129+ )
103130
104131 def _match_one (self , rec , tests ):
105132 """Check if a specific record matches tests."""
@@ -130,34 +157,92 @@ def _extract_subdict(self, rec, keys):
130157 for key in keys :
131158 d [key ] = rec [key ]
132159 return copy (d )
160+
161+ # methods for monitoring size / culling history
162+
163+ def _add_bytes (self , rec ):
164+ for key in ('buffers' , 'result_buffers' ):
165+ for buf in rec .get (key ) or []:
166+ self ._buffer_bytes += len (buf )
167+
168+ self ._maybe_cull ()
169+
170+ def _drop_bytes (self , rec ):
171+ for key in ('buffers' , 'result_buffers' ):
172+ for buf in rec .get (key ) or []:
173+ self ._buffer_bytes -= len (buf )
174+
175+ def _cull_oldest (self , n = 1 ):
176+ """cull the oldest N records"""
177+ for msg_id in self .get_history ()[:n ]:
178+ self .log .debug ("Culling record: %r" , msg_id )
179+ self ._culled_ids .add (msg_id )
180+ self .drop_record (msg_id )
181+
182+ def _maybe_cull (self ):
183+ # cull by count:
184+ if len (self ._records ) > self .record_limit :
185+ to_cull = int (self .cull_fraction * self .record_limit )
186+ self .log .info ("%i records exceeds limit of %i, culling oldest %i" ,
187+ len (self ._records ), self .record_limit , to_cull
188+ )
189+ self ._cull_oldest (to_cull )
190+
191+ # cull by size:
192+ if self ._buffer_bytes > self .size_limit :
193+ limit = self .size_limit * (1 - self .cull_fraction )
194+
195+ before = self ._buffer_bytes
196+ before_count = len (self ._records )
197+ culled = 0
198+ while self ._buffer_bytes > limit :
199+ self ._cull_oldest (1 )
200+ culled += 1
201+
202+ self .log .info ("%i records with total buffer size %i exceeds limit: %i. Culled oldest %i records." ,
203+ before_count , before , self .size_limit , culled
204+ )
205+
206+ # public API methods:
133207
134208 def add_record (self , msg_id , rec ):
135209 """Add a new Task Record, by msg_id."""
136210 if msg_id in self ._records :
137211 raise KeyError ("Already have msg_id %r" % (msg_id ))
138212 self ._records [msg_id ] = rec
213+ self ._add_bytes (rec )
214+ self ._maybe_cull ()
139215
140216 def get_record (self , msg_id ):
141217 """Get a specific Task Record, by msg_id."""
218+ if msg_id in self ._culled_ids :
219+ raise KeyError ("Record %r has been culled for size" % msg_id )
142220 if not msg_id in self ._records :
143221 raise KeyError ("No such msg_id %r" % (msg_id ))
144222 return copy (self ._records [msg_id ])
145223
146224 def update_record (self , msg_id , rec ):
147225 """Update the data in an existing record."""
148- self ._records [msg_id ].update (rec )
226+ if msg_id in self ._culled_ids :
227+ raise KeyError ("Record %r has been culled for size" % msg_id )
228+ _rec = self ._records [msg_id ]
229+ self ._drop_bytes (_rec )
230+ _rec .update (rec )
231+ self ._add_bytes (_rec )
149232
150233 def drop_matching_records (self , check ):
151234 """Remove a record from the DB."""
152235 matches = self ._match (check )
153- for m in matches :
154- del self ._records [m ['msg_id' ]]
236+ for rec in matches :
237+ self ._drop_bytes (rec )
238+ del self ._records [rec ['msg_id' ]]
155239
156240 def drop_record (self , msg_id ):
157241 """Remove a record from the DB."""
242+ rec = self ._records [msg_id ]
243+ self ._drop_bytes (rec )
158244 del self ._records [msg_id ]
159245
160-
161246 def find_records (self , check , keys = None ):
162247 """Find records matching a query dict, optionally extracting subset of keys.
163248
@@ -178,17 +263,18 @@ def find_records(self, check, keys=None):
178263 else :
179264 return matches
180265
181-
182266 def get_history (self ):
183267 """get all msg_ids, ordered by time submitted."""
184268 msg_ids = self ._records .keys ()
185269 return sorted (msg_ids , key = lambda m : self ._records [m ]['submitted' ])
186270
271+
187272NODATA = KeyError ("NoDB backend doesn't store any data. "
188273"Start the Controller with a DB backend to enable resubmission / result persistence."
189274)
190275
191- class NoDB (DictDB ):
276+
277+ class NoDB (BaseDB ):
192278 """A blackhole db backend that actually stores no information.
193279
194280 Provides the full DB interface, but raises KeyErrors on any
0 commit comments