@@ -28,9 +28,13 @@ int large_buf_t::compute_num_levels(block_size_t block_size, int64_t end_offset)
2828 return levels;
2929}
3030
31+ int large_buf_t::compute_num_sublevels (block_size_t block_size, int64_t end_offset) {
32+ int levels = compute_num_levels (block_size, end_offset);
33+ return compute_large_buf_ref_num_inlined (block_size, end_offset) == 1 ? levels : levels - 1 ;
34+ }
3135
3236int large_buf_t::compute_large_buf_ref_num_inlined (block_size_t block_size, int64_t end_offset) {
33- assert (end_offset >= 0 );
37+ rassert (end_offset >= 0 );
3438 int levels = compute_num_levels (block_size, end_offset);
3539 if (levels == 1 ) {
3640 return 1 ;
@@ -100,6 +104,34 @@ buftree_t *large_buf_t::allocate_buftree(int64_t offset, int64_t size, int level
100104 return ret;
101105}
102106
107+ void large_buf_t::allocates_part_of_tree (std::vector<buftree_t *> *ptrs, block_id_t *block_ids, int64_t offset, int64_t size, int64_t sublevels) {
108+ int64_t step = max_offset (sublevels);
109+ for (int k = 0 ; int64_t (k) * step < offset + size; ++k) {
110+ int64_t i = int64_t (k) * step;
111+
112+ rassert ((int )ptrs->size () >= k);
113+
114+ if ((int64_t )ptrs->size () == k) {
115+ ptrs->push_back (NULL );
116+ block_ids[k] = NULL_BLOCK_ID;
117+ }
118+
119+ if (i + step > offset) {
120+ int64_t child_offset = std::max (offset - i, 0L );
121+ int64_t child_end_offset = std::min (offset + size - i, step);
122+
123+ if ((*ptrs)[k] == NULL ) {
124+ block_id_t id;
125+ buftree_t *child = allocate_buftree (child_offset, child_end_offset - child_offset, sublevels, &id);
126+ (*ptrs)[k] = child;
127+ block_ids[k] = id;
128+ } else {
129+ allocate_part_of_tree ((*ptrs)[k], child_offset, child_end_offset - child_offset, sublevels);
130+ }
131+ }
132+ }
133+ }
134+
103135void large_buf_t::allocate_part_of_tree (buftree_t *tr, int64_t offset, int64_t size, int levels) {
104136 rassert (tr->level == levels);
105137 if (levels == 1 ) {
@@ -146,12 +178,25 @@ void large_buf_t::allocate(int64_t _size, large_buf_ref *refout) {
146178
147179 root_ref.offset = 0 ;
148180 root_ref.size = _size;
149- root = allocate_buftree (0 , _size, num_levels (_size), &root_ref.block_id ());
181+
182+ int num_inlined = large_buf_t::compute_large_buf_ref_num_inlined (block_size, _size);
183+ roots.resize (num_inlined);
184+ if (num_inlined == 1 ) {
185+ roots[0 ] = allocate_buftree (0 , _size, num_levels (_size), &root_ref.block_ids [0 ]);
186+ } else {
187+ for (int i = 0 ; i < num_inlined; ++i) {
188+ int num_sublevels = num_levels (_size) - 1 ;
189+ rassert (num_sublevels >= 1 );
190+ int sz = max_offset (num_sublevels);
191+ roots[i] = allocate_buftree (0 , (i == num_inlined - 1 ? _size - i * sz : sz), num_sublevels,
192+ &root_ref.block_ids [i]);
193+ }
194+ }
150195
151196 state = loaded;
152197
153198 memcpy (refout, &root_ref, root_ref.refsize (block_size));
154- rassert (root ->level == num_levels (root_ref.offset + root_ref.size ));
199+ rassert (roots[ 0 ] ->level == num_levels (root_ref.offset + root_ref.size ) - (num_inlined != 1 ));
155200}
156201
157202struct tree_available_callback_t {
@@ -238,14 +283,15 @@ struct acquire_buftree_fsm_t : public block_available_callback_t, public tree_av
238283 }
239284};
240285
286+ // TODO get rid of this, just have large_buf_t : public
287+ // tree_available_callback_t.
241288struct lb_tree_available_callback_t : public tree_available_callback_t {
242289 large_buf_t *lb;
243290 explicit lb_tree_available_callback_t (large_buf_t *lb_) : lb(lb_) { }
244- void on_available (buftree_t *tr, int neg1) {
245- rassert (neg1 == -1 );
291+ void on_available (buftree_t *tr, int index) {
246292 large_buf_t *l = lb;
247293 delete this ;
248- l->buftree_acquired (tr);
294+ l->buftree_acquired (tr, index );
249295 }
250296};
251297
@@ -262,9 +308,35 @@ void large_buf_t::acquire_slice(const large_buf_ref *root_ref_, access_t access_
262308 state = loading;
263309
264310 int levels = num_levels (root_ref.offset + root_ref.size );
265- tree_available_callback_t *cb = new lb_tree_available_callback_t (this );
266- acquire_buftree_fsm_t *f = new acquire_buftree_fsm_t (this , root_ref.block_id (), root_ref.offset + slice_offset, slice_size, levels, cb, -1 );
267- f->go ();
311+ int num_inlined = compute_large_buf_ref_num_inlined (block_size, root_ref.offset + root_ref.size );
312+
313+ roots.resize (num_inlined);
314+ if (num_inlined == 1 ) {
315+ num_to_acquire = 1 ;
316+ tree_available_callback_t *cb = new lb_tree_available_callback_t (this );
317+ acquire_buftree_fsm_t *f = new acquire_buftree_fsm_t (this , root_ref.block_ids [0 ], root_ref.offset + slice_offset, slice_size, levels, cb, 0 );
318+ f->go ();
319+ } else {
320+ // Yet another case of slicing logic.
321+
322+ int num_sublevels = levels - 1 ;
323+ int64_t subsize = max_offset (num_sublevels);
324+ int64_t beg = root_ref.offset + slice_offset;
325+ int64_t end = beg + slice_size;
326+
327+ int i = beg / subsize;
328+ int e = ceil_divide (end, subsize);
329+
330+ num_to_acquire = e - i;
331+
332+ for (int i = beg / subsize, e = ceil_divide (end, subsize); i < e; ++i) {
333+ tree_available_callback_t *cb = new lb_tree_available_callback_t (this );
334+ int64_t thisbeg = std::max (beg, i * subsize);
335+ int64_t thisend = std::min (end, (i + 1 ) * subsize);
336+ acquire_buftree_fsm_t *f = new acquire_buftree_fsm_t (this , root_ref.block_ids [i], thisbeg, thisend - thisbeg, num_sublevels, cb, i);
337+ f->go ();
338+ }
339+ }
268340}
269341
270342void large_buf_t::acquire (const large_buf_ref *root_ref_, access_t access_, large_buf_available_callback_t *callback_) {
@@ -286,10 +358,18 @@ void large_buf_t::acquire_lhs(const large_buf_ref *root_ref_, access_t access_,
286358void large_buf_t::buftree_acquired (buftree_t *tr) {
287359 rassert (state == loading);
288360 rassert (tr);
289- root = tr;
290- rassert (tr->level == num_levels (root_ref.offset + root_ref.size ));
291- state = loaded;
292- callback->on_large_buf_available (this );
361+ rassert (0 <= index && size_t (index) < roots.size ());
362+ rassert (roots[index] == NULL );
363+
364+ roots[index] = tr;
365+
366+ rassert (tr->level == num_levels (root_ref.offset + root_ref.size ) - (compute_large_buf_ref_num_inlined (block_size, root_ref.offset + root_ref.size ) != 1 ));
367+
368+ num_to_acquire --;
369+ if (num_to_acquire == 0 ) {
370+ state = loaded;
371+ callback->on_large_buf_available (this );
372+ }
293373}
294374
295375buftree_t *large_buf_t ::add_level(buftree_t *tr, block_id_t id, block_id_t *new_id
@@ -325,32 +405,69 @@ buftree_t *large_buf_t::add_level(buftree_t *tr, block_id_t id, block_id_t *new_
325405 return ret;
326406}
327407
408+ std::vector<buftree_t *> large_buf_t::adds_level (const std::vector<buftree_t *>& tr, block_id_t *ids
409+ #ifndef NDEBUG
410+ , int nextlevels
411+ #endif
412+ ) {
413+ // We only need one buftree because the inlined memory size must
414+ // be less than internal node size
415+ buftree_t *ret = new buftree_t ();
416+ #ifndef NDEBUG
417+ ret->level = nextlevels;
418+ #endif
419+
420+ block_id_t new_id;
421+ ret->buf = transaction->allocate (&new_id);
422+
423+ #ifndef NDEBUG
424+ num_bufs++;
425+ #endif
426+
427+ large_buf_internal *node = ptr_cast<large_buf_internal>(ret->buf ->get_data_write ());
428+
429+ node->magic = large_buf_internal::expected_magic;
430+
431+ #ifndef NDEBUG
432+ for (int i = 0 ; i < num_internal_kids (); ++i) {
433+ node->kids [i] = NULL_BLOCK_ID;
434+ }
435+ #endif
436+
437+ for (int i = 0 , ie = tr.size (); i < ie; i++) {
438+ node->kids [i] = ids[i];
439+ #ifndef NDEBUG
440+ ids[i] = NULL_BLOCK_ID;
441+ #endif
442+ }
443+
444+ ret->children = tr;
445+ std::vector<buftree_t *> retvec (1 , ret);
446+ return retvec;
447+ }
448+
328449// TODO check for and support partial acquisition
329450void large_buf_t::append (int64_t extra_size, large_buf_ref *refout) {
330451 rassert (state == loaded);
331452
332- buftree_t *tr = root;
453+ const int64_t back = root_ref.offset + root_ref.size ;
454+ int prev_sublevels = num_sublevels (back);
455+ int new_sublevels = num_sublevels (back + extra_size);
333456
334- int64_t back = root_ref.offset + root_ref.size ;
335- int levels = num_levels (back + extra_size);
336-
337- rassert (tr->level == num_levels (back));
338-
339- block_id_t id = root_ref.block_id ();
340- for (int i = num_levels (root_ref.offset + root_ref.size ); i < levels; ++i) {
341- tr = add_level (tr, id, &id
457+ for (int i = prev_sublevels; i < new_sublevels; ++i) {
458+ roots = adds_level (roots, root_ref.block_ids
342459#ifndef NDEBUG
343- , i + 1
460+ , i + 1
344461#endif
345- );
462+ );
346463 }
347464
348- allocate_part_of_tree (tr, back, extra_size, levels);
465+ allocates_part_of_tree (&roots, root_ref.block_ids , back, extra_size, new_sublevels);
466+
349467 root_ref.size += extra_size;
350- root_ref.block_id () = id;
351- root = tr;
468+
352469 memcpy (refout, &root_ref, root_ref.refsize (block_size));
353- rassert (root ->level == num_levels (root_ref.offset + root_ref.size ));
470+ rassert (roots[ 0 ] ->level = num_sublevels (root_ref.offset + root_ref.size ));
354471}
355472
356473void large_buf_t::prepend (int64_t extra_size, large_buf_ref *refout) {
0 commit comments