Skip to content

Commit feff595

Browse files
committed
Applied 'Partly implemented complex largebufs I.' e52abcddcf47de51aa83e2dd8e03ccb11edb6582
1 parent 45bd649 commit feff595

3 files changed

Lines changed: 158 additions & 34 deletions

File tree

src/buffer_cache/large_buf.cc

Lines changed: 145 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@ int large_buf_t::compute_num_levels(block_size_t block_size, int64_t end_offset)
2828
return levels;
2929
}
3030

31+
int large_buf_t::compute_num_sublevels(block_size_t block_size, int64_t end_offset) {
32+
int levels = compute_num_levels(block_size, end_offset);
33+
return compute_large_buf_ref_num_inlined(block_size, end_offset) == 1 ? levels : levels - 1;
34+
}
3135

3236
int large_buf_t::compute_large_buf_ref_num_inlined(block_size_t block_size, int64_t end_offset) {
33-
assert(end_offset >= 0);
37+
rassert(end_offset >= 0);
3438
int levels = compute_num_levels(block_size, end_offset);
3539
if (levels == 1) {
3640
return 1;
@@ -100,6 +104,34 @@ buftree_t *large_buf_t::allocate_buftree(int64_t offset, int64_t size, int level
100104
return ret;
101105
}
102106

107+
void large_buf_t::allocates_part_of_tree(std::vector<buftree_t *> *ptrs, block_id_t *block_ids, int64_t offset, int64_t size, int64_t sublevels) {
108+
int64_t step = max_offset(sublevels);
109+
for (int k = 0; int64_t(k) * step < offset + size; ++k) {
110+
int64_t i = int64_t(k) * step;
111+
112+
rassert((int)ptrs->size() >= k);
113+
114+
if ((int64_t)ptrs->size() == k) {
115+
ptrs->push_back(NULL);
116+
block_ids[k] = NULL_BLOCK_ID;
117+
}
118+
119+
if (i + step > offset) {
120+
int64_t child_offset = std::max(offset - i, 0L);
121+
int64_t child_end_offset = std::min(offset + size - i, step);
122+
123+
if ((*ptrs)[k] == NULL) {
124+
block_id_t id;
125+
buftree_t *child = allocate_buftree(child_offset, child_end_offset - child_offset, sublevels, &id);
126+
(*ptrs)[k] = child;
127+
block_ids[k] = id;
128+
} else {
129+
allocate_part_of_tree((*ptrs)[k], child_offset, child_end_offset - child_offset, sublevels);
130+
}
131+
}
132+
}
133+
}
134+
103135
void large_buf_t::allocate_part_of_tree(buftree_t *tr, int64_t offset, int64_t size, int levels) {
104136
rassert(tr->level == levels);
105137
if (levels == 1) {
@@ -146,12 +178,25 @@ void large_buf_t::allocate(int64_t _size, large_buf_ref *refout) {
146178

147179
root_ref.offset = 0;
148180
root_ref.size = _size;
149-
root = allocate_buftree(0, _size, num_levels(_size), &root_ref.block_id());
181+
182+
int num_inlined = large_buf_t::compute_large_buf_ref_num_inlined(block_size, _size);
183+
roots.resize(num_inlined);
184+
if (num_inlined == 1) {
185+
roots[0] = allocate_buftree(0, _size, num_levels(_size), &root_ref.block_ids[0]);
186+
} else {
187+
for (int i = 0; i < num_inlined; ++i) {
188+
int num_sublevels = num_levels(_size) - 1;
189+
rassert(num_sublevels >= 1);
190+
int sz = max_offset(num_sublevels);
191+
roots[i] = allocate_buftree(0, (i == num_inlined - 1 ? _size - i * sz : sz), num_sublevels,
192+
&root_ref.block_ids[i]);
193+
}
194+
}
150195

151196
state = loaded;
152197

153198
memcpy(refout, &root_ref, root_ref.refsize(block_size));
154-
rassert(root->level == num_levels(root_ref.offset + root_ref.size));
199+
rassert(roots[0]->level == num_levels(root_ref.offset + root_ref.size) - (num_inlined != 1));
155200
}
156201

157202
struct tree_available_callback_t {
@@ -238,14 +283,15 @@ struct acquire_buftree_fsm_t : public block_available_callback_t, public tree_av
238283
}
239284
};
240285

286+
// TODO get rid of this, just have large_buf_t : public
287+
// tree_available_callback_t.
241288
struct lb_tree_available_callback_t : public tree_available_callback_t {
242289
large_buf_t *lb;
243290
explicit lb_tree_available_callback_t(large_buf_t *lb_) : lb(lb_) { }
244-
void on_available(buftree_t *tr, int neg1) {
245-
rassert(neg1 == -1);
291+
void on_available(buftree_t *tr, int index) {
246292
large_buf_t *l = lb;
247293
delete this;
248-
l->buftree_acquired(tr);
294+
l->buftree_acquired(tr, index);
249295
}
250296
};
251297

@@ -262,9 +308,35 @@ void large_buf_t::acquire_slice(const large_buf_ref *root_ref_, access_t access_
262308
state = loading;
263309

264310
int levels = num_levels(root_ref.offset + root_ref.size);
265-
tree_available_callback_t *cb = new lb_tree_available_callback_t(this);
266-
acquire_buftree_fsm_t *f = new acquire_buftree_fsm_t(this, root_ref.block_id(), root_ref.offset + slice_offset, slice_size, levels, cb, -1);
267-
f->go();
311+
int num_inlined = compute_large_buf_ref_num_inlined(block_size, root_ref.offset + root_ref.size);
312+
313+
roots.resize(num_inlined);
314+
if (num_inlined == 1) {
315+
num_to_acquire = 1;
316+
tree_available_callback_t *cb = new lb_tree_available_callback_t(this);
317+
acquire_buftree_fsm_t *f = new acquire_buftree_fsm_t(this, root_ref.block_ids[0], root_ref.offset + slice_offset, slice_size, levels, cb, 0);
318+
f->go();
319+
} else {
320+
// Yet another case of slicing logic.
321+
322+
int num_sublevels = levels - 1;
323+
int64_t subsize = max_offset(num_sublevels);
324+
int64_t beg = root_ref.offset + slice_offset;
325+
int64_t end = beg + slice_size;
326+
327+
int i = beg / subsize;
328+
int e = ceil_divide(end, subsize);
329+
330+
num_to_acquire = e - i;
331+
332+
for (int i = beg / subsize, e = ceil_divide(end, subsize); i < e; ++i) {
333+
tree_available_callback_t *cb = new lb_tree_available_callback_t(this);
334+
int64_t thisbeg = std::max(beg, i * subsize);
335+
int64_t thisend = std::min(end, (i + 1) * subsize);
336+
acquire_buftree_fsm_t *f = new acquire_buftree_fsm_t(this, root_ref.block_ids[i], thisbeg, thisend - thisbeg, num_sublevels, cb, i);
337+
f->go();
338+
}
339+
}
268340
}
269341

270342
void large_buf_t::acquire(const large_buf_ref *root_ref_, access_t access_, large_buf_available_callback_t *callback_) {
@@ -286,10 +358,18 @@ void large_buf_t::acquire_lhs(const large_buf_ref *root_ref_, access_t access_,
286358
void large_buf_t::buftree_acquired(buftree_t *tr) {
287359
rassert(state == loading);
288360
rassert(tr);
289-
root = tr;
290-
rassert(tr->level == num_levels(root_ref.offset + root_ref.size));
291-
state = loaded;
292-
callback->on_large_buf_available(this);
361+
rassert(0 <= index && size_t(index) < roots.size());
362+
rassert(roots[index] == NULL);
363+
364+
roots[index] = tr;
365+
366+
rassert(tr->level == num_levels(root_ref.offset + root_ref.size) - (compute_large_buf_ref_num_inlined(block_size, root_ref.offset + root_ref.size) != 1));
367+
368+
num_to_acquire --;
369+
if (num_to_acquire == 0) {
370+
state = loaded;
371+
callback->on_large_buf_available(this);
372+
}
293373
}
294374

295375
buftree_t *large_buf_t::add_level(buftree_t *tr, block_id_t id, block_id_t *new_id
@@ -325,32 +405,69 @@ buftree_t *large_buf_t::add_level(buftree_t *tr, block_id_t id, block_id_t *new_
325405
return ret;
326406
}
327407

408+
std::vector<buftree_t *> large_buf_t::adds_level(const std::vector<buftree_t *>& tr, block_id_t *ids
409+
#ifndef NDEBUG
410+
, int nextlevels
411+
#endif
412+
) {
413+
// We only need one buftree because the inlined memory size must
414+
// be less than internal node size
415+
buftree_t *ret = new buftree_t();
416+
#ifndef NDEBUG
417+
ret->level = nextlevels;
418+
#endif
419+
420+
block_id_t new_id;
421+
ret->buf = transaction->allocate(&new_id);
422+
423+
#ifndef NDEBUG
424+
num_bufs++;
425+
#endif
426+
427+
large_buf_internal *node = ptr_cast<large_buf_internal>(ret->buf->get_data_write());
428+
429+
node->magic = large_buf_internal::expected_magic;
430+
431+
#ifndef NDEBUG
432+
for (int i = 0; i < num_internal_kids(); ++i) {
433+
node->kids[i] = NULL_BLOCK_ID;
434+
}
435+
#endif
436+
437+
for (int i = 0, ie = tr.size(); i < ie; i++) {
438+
node->kids[i] = ids[i];
439+
#ifndef NDEBUG
440+
ids[i] = NULL_BLOCK_ID;
441+
#endif
442+
}
443+
444+
ret->children = tr;
445+
std::vector<buftree_t *> retvec(1, ret);
446+
return retvec;
447+
}
448+
328449
// TODO check for and support partial acquisition
329450
void large_buf_t::append(int64_t extra_size, large_buf_ref *refout) {
330451
rassert(state == loaded);
331452

332-
buftree_t *tr = root;
453+
const int64_t back = root_ref.offset + root_ref.size;
454+
int prev_sublevels = num_sublevels(back);
455+
int new_sublevels = num_sublevels(back + extra_size);
333456

334-
int64_t back = root_ref.offset + root_ref.size;
335-
int levels = num_levels(back + extra_size);
336-
337-
rassert(tr->level == num_levels(back));
338-
339-
block_id_t id = root_ref.block_id();
340-
for (int i = num_levels(root_ref.offset + root_ref.size); i < levels; ++i) {
341-
tr = add_level(tr, id, &id
457+
for (int i = prev_sublevels; i < new_sublevels; ++i) {
458+
roots = adds_level(roots, root_ref.block_ids
342459
#ifndef NDEBUG
343-
, i + 1
460+
, i + 1
344461
#endif
345-
);
462+
);
346463
}
347464

348-
allocate_part_of_tree(tr, back, extra_size, levels);
465+
allocates_part_of_tree(&roots, root_ref.block_ids, back, extra_size, new_sublevels);
466+
349467
root_ref.size += extra_size;
350-
root_ref.block_id() = id;
351-
root = tr;
468+
352469
memcpy(refout, &root_ref, root_ref.refsize(block_size));
353-
rassert(root->level == num_levels(root_ref.offset + root_ref.size));
470+
rassert(roots[0]->level = num_sublevels(root_ref.offset + root_ref.size));
354471
}
355472

356473
void large_buf_t::prepend(int64_t extra_size, large_buf_ref *refout) {

src/buffer_cache/large_buf.hpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,9 @@ class large_buf_t {
5151
large_buf_ref root_ref;
5252
char root_ref_bytes[LBREF_SIZE];
5353
};
54-
buftree_t *root;
54+
std::vector<buftree_t *> roots;
5555
access_t access;
56+
int num_to_acquire;
5657
large_buf_available_callback_t *callback;
5758

5859
transaction_t *transaction;
@@ -109,14 +110,15 @@ class large_buf_t {
109110

110111
void index_acquired(buf_t *buf);
111112
void segment_acquired(buf_t *buf, uint16_t ix);
112-
void buftree_acquired(buftree_t *tr);
113+
void buftree_acquired(buftree_t *tr, int index);
113114

114115
friend struct acquire_buftree_fsm_t;
115116

116117
static int64_t cache_size_to_leaf_bytes(block_size_t block_size);
117118
static int64_t cache_size_to_internal_kids(block_size_t block_size);
118119
static int64_t compute_max_offset(block_size_t block_size, int levels);
119120
static int compute_num_levels(block_size_t block_size, int64_t end_offset);
121+
static int compute_num_sublevels(block_size_t block_size, int64_t end_offset);
120122

121123
static int compute_large_buf_ref_num_inlined(block_size_t block_size, int64_t end_offset);
122124

@@ -125,6 +127,7 @@ class large_buf_t {
125127
int64_t num_internal_kids() const;
126128
int64_t max_offset(int levels) const;
127129
int num_levels(int64_t end_offset) const;
130+
int num_sublevels(int64_t end_offset) const;
128131

129132
buftree_t *allocate_buftree(int64_t size, int64_t offset, int levels, block_id_t *block_id);
130133
buftree_t *acquire_buftree(block_id_t block_id, int64_t offset, int64_t size, int levels, tree_available_callback_t *cb);
@@ -135,7 +138,13 @@ class large_buf_t {
135138
, int nextlevels
136139
#endif
137140
);
141+
std::vector<buftree_t *> adds_level(const std::vector<buftree_t *>& tr, block_id_t *ids
142+
#ifndef NDEBUG
143+
, int nextlevels
144+
#endif
145+
);
138146
void allocate_part_of_tree(buftree_t *tr, int64_t offset, int64_t size, int levels);
147+
void allocates_part_of_tree(std::vector<buftree_t *> *ptrs, block_id_t *block_ids, int64_t offset, int64_t size, int64_t sublevels);
139148
buftree_t *walk_tree_structure(buftree_t *tr, int64_t offset, int64_t size, int levels, void (*bufdoer)(large_buf_t *, buf_t *), buftree_t *(*buftree_cleaner)(buftree_t *));
140149
void delete_tree_structure(buftree_t *tr, int64_t offset, int64_t size, int levels);
141150
void only_mark_deleted_tree_structure(buftree_t *tr, int64_t offset, int64_t size, int levels);

src/buffer_cache/types.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,8 @@
22
#include "buffer_cache/large_buf.hpp"
33

44
int large_buf_ref::refsize(block_size_t block_size, int64_t size, int64_t offset) {
5-
// We haven't implemented inlining yet.
6-
// return sizeof(large_buf_ref) + sizeof(block_id_t) * large_buf_t::compute_large_buf_ref_size(block_size, size + offset);
7-
8-
return sizeof(large_buf_ref) + sizeof(block_id_t);
5+
// We haven't implemented inlining yet. // TODO probably this comment is out of date.
6+
return sizeof(large_buf_ref) + sizeof(block_id_t) * large_buf_t::compute_large_buf_ref_size(block_size, size + offset);
97
}
108
int large_buf_ref::refsize(block_size_t block_size) const {
119
return large_buf_ref::refsize(block_size, this->size, this->offset);

0 commit comments

Comments
 (0)