forked from google/array_record
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patharray_record_reader.h
More file actions
396 lines (356 loc) · 14.7 KB
/
array_record_reader.h
File metadata and controls
396 lines (356 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
/* Copyright 2022 Google LLC. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/
// `ArrayRecordReader` reads a riegeli file written by `ArrayRecordWriter`. It
// supports concurrent readahead, random access by record index, and parallel
// read with callbacks.
//
// The concurrency mechanism can be fiber based or thread pool based. Fiber is
// the preferred default to achieve asynchronous IO. However, user may also
// choose to use the old-school thread pool instead. In the OSS version, only
// thread pool is supported.
//
// Features to be implemented in the future:
// TODO(fchern): Supports field projection for columnar read.
//
// Low priority items. Contact us if you need any of the features below.
// TODO(fchern): Recover file/chunk corruption.
// TODO(fchern): Supports append.
//
#ifndef ARRAY_RECORD_CPP_ARRAY_RECORD_READER_H_
#define ARRAY_RECORD_CPP_ARRAY_RECORD_READER_H_
#include <cstdint>
#include <functional>
#include <future> // NOLINT(build/c++11)
#include <memory>
#include <optional>
#include <queue>
#include <type_traits>
#include <utility>
#include "google/protobuf/message_lite.h"
#include "absl/functional/function_ref.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "cpp/common.h"
#include "cpp/thread_compatible_shared_ptr.h"
#include "cpp/thread_pool.h"
#include "riegeli/base/object.h"
#include "riegeli/bytes/reader.h"
namespace array_record {
// template independent part of ArrayRecordReader
class ArrayRecordReaderBase : public riegeli::Object {
public:
class Options {
public:
Options() {}
// Parses options from text:
// ```
// options ::= option? ("," option?)*
// option ::=
// "readahead_buffer_size" ":" readahead_buffer_size |
// "max_parallelism" ":" max_parallelism
// readahead_buffer_size ::= non-negative integer expressed as real with
// optional suffix [BkKMGTPE]. (Default 16MB). Set to 0 optimizes random
// access performance.
// max_parallelism ::= `auto` or non-negative integer. Each parallel
// thread
// owns its readhaed buffer with the size `readahead_buffer_size`.
// (Default thread pool size) Set to 0 optimizes random access
// performance.
// ```
static absl::StatusOr<Options> FromString(absl::string_view text);
// Readahead speeds up sequential reads, but harms random access. When using
// ArrayRecord for random access, user should configure the buffer size with
// 0.
static constexpr uint64_t kDefaultReadaheadBufferSize = 1L << 24;
Options& set_readahead_buffer_size(uint64_t readahead_buffer_size) {
readahead_buffer_size_ = readahead_buffer_size;
return *this;
}
uint64_t readahead_buffer_size() const { return readahead_buffer_size_; }
// Specifies max number of concurrent readaheads. Setting max_parallelism to
// 0 disables readaheads prefetching.
Options& set_max_parallelism(std::optional<uint32_t> max_parallelism) {
max_parallelism_ = max_parallelism;
return *this;
}
std::optional<uint32_t> max_parallelism() const { return max_parallelism_; }
private:
std::optional<uint32_t> max_parallelism_ = std::nullopt;
uint64_t readahead_buffer_size_ = kDefaultReadaheadBufferSize;
};
// Reads the entire file in parallel and invokes the callback function of
// signature:
//
// uint64_t record_index, absl::string_view record_data -> absl::Status
//
// Return values: status ok for a sucessful read.
absl::Status ParallelReadRecords(
absl::FunctionRef<absl::Status(/*record_index=*/uint64_t,
/*record_data=*/absl::string_view)>
callback) const;
// Reads the entire file in parallel and invokes the callback function of
// signature:
//
// uint64_t record_index, UserProtoType user_proto -> absl::Status
//
// Example:
//
// auto status = reader.ParallelReadRecords<GenericFeatureVector>(
// [&](uint64_t record_index, GenericFeatureVector result_gfv) {
// // do our work
// return absl::OkStatus();
// }))
//
// This method is a handy wrapper for processing protobufs stored in
// ArrayRecord.
//
// Return values: status ok for a sucessful read.
template <typename ProtoT, typename FunctionT,
typename = std::enable_if_t<
std::is_base_of_v<google::protobuf::MessageLite, ProtoT>>>
absl::Status ParallelReadRecords(FunctionT callback) const {
return ParallelReadRecords(
[&](uint64_t record_idx, absl::string_view record) -> absl::Status {
ProtoT record_proto;
// Like ParseFromString(), but accepts messages that are missing
// required fields.
if (!record_proto.ParsePartialFromString(record)) {
return InternalError("Failed to parse. record_idx: %d", record_idx);
}
return callback(record_idx, std::move(record_proto));
});
}
// Reads the records with user supplied indices and invokes the callback
// function of signature:
//
// uint64_t indices_index, absl::string_view record_data -> absl::Status
//
// To obtain the record index, do `indices[indices_index]`.
//
// Return values: status ok for a sucessful read.
absl::Status ParallelReadRecordsWithIndices(
absl::Span<const uint64_t> indices,
absl::FunctionRef<absl::Status(/*indices_index=*/uint64_t,
/*record_data=*/absl::string_view)>
callback) const;
// Reads the records with user supplied indices and invokes the callback
// function of signature:
//
// uint64_t indices_index, UserProtoType user_proto -> absl::Status
//
// This method is a handy wrapper for processing protobufs stored in
// ArrayRecord. To obtain the record index, do `indices[indices_index]`.
//
// Example:
//
// auto status = reader.ParallelReadRecordsWithIndices<GenericFeatureVector>
// ({1, 2, 3},
// [&](uint64_t indices_index, GenericFeatureVector result_gfv) {
// // do our work
// return absl::OkStatus();
// });
//
// Return values: status ok for a sucessful read.
template <typename ProtoT, typename FunctionT,
typename = std::enable_if_t<
std::is_base_of_v<google::protobuf::MessageLite, ProtoT>>>
absl::Status ParallelReadRecordsWithIndices(
absl::Span<const uint64_t> indices, FunctionT callback) const {
return ParallelReadRecordsWithIndices(
indices,
[&](uint64_t indices_idx, absl::string_view record) -> absl::Status {
ProtoT record_proto;
// Like ParseFromString(), but accepts messages that are missing
// required fields.
if (!record_proto.ParsePartialFromString(record)) {
return InternalError("Failed to parse. indices_idx: %d",
indices_idx);
}
return callback(indices_idx, std::move(record_proto));
});
}
// Reads the records with user supplied range and invokes the callback
// function of signature:
//
// uint64_t record_index, absl::string_view record_data -> absl::Status
//
// The specified `begin` and `end` must within the range of the available
// records. `begin` is inclusive, and `end` is exclusive.
//
// Return values: status ok for a sucessful read.
absl::Status ParallelReadRecordsInRange(
uint64_t begin, uint64_t end,
absl::FunctionRef<absl::Status(/*record_index=*/uint64_t,
/*record_data=*/absl::string_view)>
callback) const;
// Reads the records with user supplied range and invokes the callback
// function of signature:
//
// uint64_t record_index, UserProtoType user_proto -> absl::Status
//
// The specified `begin` and `end` must within the range of the available
// records. `begin` is inclusive, and `end` is exclusive.
//
// Return values: status ok for a sucessful read.
template <typename ProtoT, typename FunctionT,
typename = std::enable_if_t<
std::is_base_of_v<google::protobuf::MessageLite, ProtoT>>>
absl::Status ParallelReadRecordsInRange(uint64_t begin, uint64_t end,
FunctionT callback) const {
return ParallelReadRecordsInRange(
begin, end,
[&](uint64_t recrod_index, absl::string_view record) -> absl::Status {
ProtoT record_proto;
// Like ParseFromString(), but accepts messages that are missing
// required fields.
if (!record_proto.ParsePartialFromString(record)) {
return InternalError("Failed to parse. record_index: %d",
recrod_index);
}
return callback(recrod_index, std::move(record_proto));
});
}
// Number of records in the opened file.
uint64_t NumRecords() const;
// Number of records in each compressed chunk configured at the file writing
// stage by `ArrayRecordWriterBase::Options::set_group_size`. The acutal
// number of records per group could be smaller equals to this number.
uint64_t RecordGroupSize() const;
// Index of the record to be read.
uint64_t RecordIndex() const;
// Seek to a particular record index for the next read. This method discards
// read ahead buffers, so users should not rely on this function for
// performance critical random access.
//
// For batch lookup, prefer `ParallelReadRecordsWithIndices` over this method.
//
// For low latency lookup, load all data with `ParallelReadRecords` into user
// defined in-memory data structure then perform the lookup.
bool SeekRecord(uint64_t record_index);
// Reads the next record `RecordIndex()` pointed to.
//
// Return values:
// `true` (when `ok()`, `record` is set) - success
// `false` (when `ok()`) - data ends
// `false` (when `!ok()`) - failure
bool ReadRecord(google::protobuf::MessageLite* record);
// Reads the next record `RecordIndex()` pointed to.
//
// Return values:
// `true` (when `ok()`, `record` is set) - success
// `false` (when `ok()`) - data ends
// `false` (when `!ok()`) - failure
bool ReadRecord(absl::string_view* record);
// Returns the writer options for files produced after 2022-10-10.
std::optional<std::string> WriterOptionsString() const;
uint64_t ChunkStartOffset(uint64_t chunk_idx) const;
uint64_t ChunkEndOffset(uint64_t chunk_idx) const;
protected:
explicit ArrayRecordReaderBase(Options options, ARThreadPool* pool);
~ArrayRecordReaderBase() override;
// Move only. Closes `other` on move.
ArrayRecordReaderBase(ArrayRecordReaderBase&& other) noexcept;
ArrayRecordReaderBase& operator=(ArrayRecordReaderBase&& other) noexcept;
void Initialize();
virtual ThreadCompatibleSharedPtr<riegeli::Reader> get_backing_reader()
const = 0;
private:
bool ReadAheadFromBuffer(uint64_t buffer_idx);
// Holds all the internal state in a variable to simplify the implementation
// of the "Close after Move" semantic.
struct ArrayRecordReaderState;
friend class ChunkDispatcher;
std::unique_ptr<ArrayRecordReaderState> state_;
};
// `ArrayRecordReader` use templated backend abstraction. To read the
// data from a string, user simply write:
//
// std::string src = ...;
// auto reads_from_string =
// ArrayRecordReader<riegeli::StringReader<>>(std::forward_as_tuple(src));
//
// Similarly, user can read the input from a cord or from a file.
//
// absl::Cord cord = ...;
// auto reads_from_cord =
// ArrayRecordReader<riegeli::CordReader<>>(std::forward_as_tuple(cord));
//
// File* file = ...;
// auto writes_to_file =
// ArrayRecordReader<riegeli::FileReader<>>(std::forward_as_tuple(file));
//
// It is necessary to call `Close()` at the end of a successful reading session.
// It is not needed to call `Close()` on early returns, assuming that contents
// of the destination do not matter after all, e.g. because a failure is being
// reported instead; the destructor releases resources in any case.
//
// Error handling example:
//
// // Similar to RET_CHECK and RETURN_OR_ERROR
// if(!reader.ReadRecord(...)) return reader.status();
// // Must close after use.
// if(!reader.Close()) return reader.status();
//
// ArrayRecordReader is thread compatible, not thread-safe.
template <typename Src>
class ArrayRecordReader : public ArrayRecordReaderBase {
public:
DECLARE_MOVE_ONLY_CLASS(ArrayRecordReader);
// Constructor that takes the ownership of the other riegeli reader.
explicit ArrayRecordReader(Src&& src, Options options = Options(),
ARThreadPool* pool = nullptr)
: ArrayRecordReaderBase(std::move(options), pool),
main_reader_(ThreadCompatibleSharedPtr<riegeli::Reader>::Create(
std::move(src))) {
if (!main_reader_->SupportsNewReader()) {
Fail(InvalidArgumentError(
"ArrayRecordReader only work on inputs with random access support."));
return;
}
Initialize();
}
// Constructor that forwards the argument to the underlying riegeli reader and
// owns the reader internally till it closes.
template <typename... SrcArgs>
explicit ArrayRecordReader(std::tuple<SrcArgs...> src_args,
Options options = Options(),
ARThreadPool* pool = nullptr)
: ArrayRecordReaderBase(std::move(options), pool),
main_reader_(ThreadCompatibleSharedPtr<riegeli::Reader>::Create<Src>(
std::move(src_args))) {
if (!main_reader_->ok()) {
Fail(main_reader_->status());
return;
}
if (!main_reader_->SupportsNewReader()) {
Fail(absl::InvalidArgumentError(
"ArrayRecordReader only work on inputs with random access support."));
return;
}
Initialize();
}
protected:
ThreadCompatibleSharedPtr<riegeli::Reader> get_backing_reader()
const override {
return main_reader_;
}
void Done() override {
if (main_reader_.is_owning()) {
// Blocks until all detached readahead fetches finishes.
main_reader_->Close();
}
}
private:
ThreadCompatibleSharedPtr<riegeli::Reader> main_reader_;
};
} // namespace array_record
#endif // ARRAY_RECORD_CPP_ARRAY_RECORD_READER_H_