feat(fs): support int64 file IO sizes#326
Conversation
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 75 out of 75 changed files in this pull request and generated 35 comments.
Comments suppressed due to low confidence (8)
src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp:117
- Tell()/GetSize() return int64_t to Arrow; converting uint64_t positions/sizes with a plain cast can overflow and yield negative values when the underlying stream exceeds INT64_MAX.
src/paimon/common/io/data_input_stream.cpp:110 - DataInputStream::AssertBoundary uses
pos + need_length > lengthon uint64_t values; this addition can overflow and incorrectly pass the boundary check for very large need_length values. Use a subtraction-based check to avoid overflow.
Status DataInputStream::AssertBoundary(uint64_t need_length) const {
// TODO(jinli.zjw): Store current_pos and file_length as member variables to reduce the overhead
// of I/O calls.
PAIMON_ASSIGN_OR_RAISE(uint64_t pos, input_stream_->GetPos());
PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
if (pos + need_length > length) {
return Status::Invalid(
fmt::format("DataInputStream assert boundary failed: need length {}, current position "
"{}, exceed length {}",
need_length, pos, length));
}
return Status::OK();
src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp:117
- ArrowInputStreamAdapter::Tell()/GetSize() return int64_t to Arrow; casting uint64_t positions/sizes without a bounds check can overflow and return negative values when the stream exceeds INT64_MAX.
src/paimon/common/data/serializer/binary_row_serializer.cpp:52 - BinaryRowSerializer::Deserialize should validate read_length before casting to uint64_t for allocation; otherwise a negative length becomes a huge allocation attempt.
Result<BinaryRow> BinaryRowSerializer::Deserialize(DataInputStream* source) const {
BinaryRow row(num_fields_);
PAIMON_ASSIGN_OR_RAISE(int32_t read_length, source->ReadValue<int32_t>());
std::shared_ptr<Bytes> bytes =
Bytes::AllocateBytes(static_cast<uint64_t>(read_length), pool_.get());
PAIMON_RETURN_NOT_OK(source->ReadBytes(bytes.get()));
row.PointTo(MemorySegment::Wrap(bytes), 0, read_length);
return row;
}
src/paimon/common/io/data_input_stream.cpp:110
- DataInputStream::AssertBoundary uses
pos + need_length > lengthwith uint64_t values;pos + need_lengthcan overflow and incorrectly pass the boundary check. Use a subtraction-based check to avoid overflow.
Status DataInputStream::AssertBoundary(uint64_t need_length) const {
// TODO(jinli.zjw): Store current_pos and file_length as member variables to reduce the overhead
// of I/O calls.
PAIMON_ASSIGN_OR_RAISE(uint64_t pos, input_stream_->GetPos());
PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
if (pos + need_length > length) {
return Status::Invalid(
fmt::format("DataInputStream assert boundary failed: need length {}, current position "
"{}, exceed length {}",
need_length, pos, length));
}
return Status::OK();
src/paimon/common/utils/arrow/arrow_input_stream_adapter.cpp:117
- ArrowInputStreamAdapter::Tell()/GetSize() return int64_t to Arrow; casting uint64_t positions/sizes without a bounds check can overflow and return negative values when the stream exceeds INT64_MAX.
src/paimon/common/data/serializer/binary_row_serializer.cpp:52 - BinaryRowSerializer::Deserialize should validate read_length before casting to uint64_t for allocation; otherwise a negative length becomes a huge allocation attempt.
Result<BinaryRow> BinaryRowSerializer::Deserialize(DataInputStream* source) const {
BinaryRow row(num_fields_);
PAIMON_ASSIGN_OR_RAISE(int32_t read_length, source->ReadValue<int32_t>());
std::shared_ptr<Bytes> bytes =
Bytes::AllocateBytes(static_cast<uint64_t>(read_length), pool_.get());
PAIMON_RETURN_NOT_OK(source->ReadBytes(bytes.get()));
row.PointTo(MemorySegment::Wrap(bytes), 0, read_length);
return row;
}
src/paimon/common/io/data_input_stream.cpp:110
- DataInputStream::AssertBoundary uses
pos + need_length > lengthwith uint64_t values;pos + need_lengthcan overflow and incorrectly pass the boundary check. Use a subtraction-based check to avoid overflow.
Status DataInputStream::AssertBoundary(uint64_t need_length) const {
// TODO(jinli.zjw): Store current_pos and file_length as member variables to reduce the overhead
// of I/O calls.
PAIMON_ASSIGN_OR_RAISE(uint64_t pos, input_stream_->GetPos());
PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream_->Length());
if (pos + need_length > length) {
return Status::Invalid(
fmt::format("DataInputStream assert boundary failed: need length {}, current position "
"{}, exceed length {}",
need_length, pos, length));
}
return Status::OK();
| arrow::Result<int64_t> ArrowInputStreamAdapter::Read(int64_t nbytes, void* out) { | ||
| ARROW_RETURN_NOT_OK(ValidateArrowIoRange<uint32_t>(nbytes, "nbytes")); | ||
| Result<int32_t> read_bytes = | ||
| input_stream_->Read(static_cast<char*>(out), static_cast<uint32_t>(nbytes)); | ||
| Result<uint64_t> read_bytes = | ||
| input_stream_->Read(static_cast<char*>(out), static_cast<uint64_t>(nbytes)); | ||
| if (!read_bytes.ok()) { | ||
| return ToArrowStatus(read_bytes.status()); | ||
| } | ||
| return read_bytes.value(); | ||
| return static_cast<int64_t>(read_bytes.value()); | ||
| } |
| arrow::Result<int64_t> ArrowInputStreamAdapter::ReadAt(int64_t position, int64_t nbytes, | ||
| void* out) { | ||
| ARROW_RETURN_NOT_OK(ValidateArrowIoRange<uint64_t>(position, "position")); | ||
| ARROW_RETURN_NOT_OK(ValidateArrowIoRange<uint32_t>(nbytes, "nbytes")); | ||
| Result<int32_t> read_bytes = input_stream_->Read( | ||
| static_cast<char*>(out), static_cast<uint32_t>(nbytes), static_cast<uint64_t>(position)); | ||
| Result<uint64_t> read_bytes = input_stream_->Read( | ||
| static_cast<char*>(out), static_cast<uint64_t>(nbytes), static_cast<uint64_t>(position)); | ||
| if (!read_bytes.ok()) { | ||
| return ToArrowStatus(read_bytes.status()); | ||
| } | ||
| return read_bytes.value(); | ||
| return static_cast<int64_t>(read_bytes.value()); | ||
| } |
| arrow::Future<std::shared_ptr<arrow::Buffer>> ArrowInputStreamAdapter::ReadAsync( | ||
| const arrow::io::IOContext& io_context, int64_t position, int64_t nbytes) { | ||
| auto fut = arrow::Future<std::shared_ptr<arrow::Buffer>>::Make(); | ||
| auto range_status = ValidateArrowIoRange<uint64_t>(position, "position"); | ||
| if (!range_status.ok()) { | ||
| fut.MarkFinished(range_status); | ||
| return fut; | ||
| } | ||
| range_status = ValidateArrowIoRange<uint32_t>(nbytes, "nbytes"); | ||
| if (!range_status.ok()) { | ||
| fut.MarkFinished(range_status); | ||
| return fut; | ||
| } | ||
|
|
||
| arrow::Result<std::shared_ptr<arrow::Buffer>> buffer_result = | ||
| arrow::AllocateResizableBuffer(nbytes, pool_.get()); | ||
| if (PAIMON_UNLIKELY(!buffer_result.ok())) { | ||
| fut.MarkFinished(buffer_result.status()); | ||
| return fut; | ||
| } | ||
| std::shared_ptr<arrow::Buffer> buffer = std::move(buffer_result).ValueUnsafe(); | ||
| input_stream_->ReadAsync(reinterpret_cast<char*>(buffer->mutable_data()), | ||
| static_cast<uint32_t>(nbytes), static_cast<uint64_t>(position), | ||
| static_cast<uint64_t>(nbytes), static_cast<uint64_t>(position), | ||
| [fut, buffer](Status callback_status) mutable { | ||
| if (callback_status.ok()) { | ||
| fut.MarkFinished(std::move(buffer)); | ||
| } else { | ||
| fut.MarkFinished(ToArrowStatus(callback_status)); | ||
| } | ||
| }); | ||
| return fut; | ||
| } |
| arrow::Status ArrowOutputStreamAdapter::Write(const void* data, int64_t nbytes) { | ||
| if (!InRange<uint32_t>(nbytes)) { | ||
| return arrow::Status::Invalid( | ||
| fmt::format("nbytes value {} is out of bound of uint32_t", nbytes)); | ||
| } | ||
| Result<int32_t> len = | ||
| out_->Write(static_cast<const char*>(data), static_cast<uint32_t>(nbytes)); | ||
| Result<uint64_t> len = | ||
| out_->Write(static_cast<const char*>(data), static_cast<uint64_t>(nbytes)); | ||
| if (!len.ok()) { | ||
| return ToArrowStatus(len.status()); | ||
| } | ||
| if (len.value() != static_cast<uint64_t>(nbytes)) { | ||
| return arrow::Status::IOError( | ||
| fmt::format("expect write len {} mismatch actual write len {}", nbytes, len.value())); | ||
| } | ||
| return arrow::Status::OK(); |
| arrow::Result<int64_t> ArrowOutputStreamAdapter::Tell() const { | ||
| paimon::Result<int64_t> pos = out_->GetPos(); | ||
| paimon::Result<uint64_t> pos = out_->GetPos(); | ||
| if (!pos.ok()) { | ||
| return ToArrowStatus(pos.status()); | ||
| } | ||
| return pos.value(); | ||
| return static_cast<int64_t>(pos.value()); | ||
| } |
| PAIMON_ASSIGN_OR_RAISE(uint64_t pos, out->GetPos()); | ||
| PAIMON_RETURN_NOT_OK(out->Close()); | ||
| guard.Release(); | ||
| return std::make_pair(PathUtil::GetName(file_path), pos); | ||
| return std::make_pair(PathUtil::GetName(file_path), static_cast<int64_t>(pos)); |
| Result<uint64_t> OffsetInputStream::Read(char* buffer, uint64_t size) { | ||
| PAIMON_RETURN_NOT_OK(AssertBoundary(static_cast<int64_t>(inner_position_ + size))); | ||
| PAIMON_ASSIGN_OR_RAISE(uint64_t actual_read_len, wrapped_->Read(buffer, size)); | ||
| inner_position_ += actual_read_len; | ||
| return actual_read_len; | ||
| } |
| Result<uint64_t> OffsetInputStream::Read(char* buffer, uint64_t size, uint64_t offset) { | ||
| PAIMON_RETURN_NOT_OK(AssertBoundary(static_cast<int64_t>(offset + size))); | ||
| return wrapped_->Read(buffer, size, static_cast<uint64_t>(offset_) + offset); | ||
| } | ||
|
|
||
| void OffsetInputStream::ReadAsync(char* buffer, uint32_t size, uint64_t offset, | ||
| void OffsetInputStream::ReadAsync(char* buffer, uint64_t size, uint64_t offset, | ||
| std::function<void(Status)>&& callback) { | ||
| auto status = AssertBoundary(offset); | ||
| auto status = AssertBoundary(static_cast<int64_t>(offset + size)); | ||
| if (!status.ok()) { | ||
| callback(status); | ||
| return; | ||
| } | ||
| status = AssertBoundary(offset + size); | ||
| if (!status.ok()) { | ||
| callback(status); | ||
| return; | ||
| } | ||
| wrapped_->ReadAsync(buffer, size, offset_ + offset, std::move(callback)); | ||
| wrapped_->ReadAsync(buffer, size, static_cast<uint64_t>(offset_) + offset, std::move(callback)); | ||
| } |
| Result<uint64_t> AvroFileBatchReader::GetNumberOfRows() const { | ||
| if (!total_rows_) { | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t current_pos, input_stream_->GetPos()); | ||
| ScopeGuard stream_guard([this, current_pos]() -> void { | ||
| PAIMON_ASSIGN_OR_RAISE(uint64_t current_pos, input_stream_->GetPos()); | ||
| auto seek_pos = static_cast<int64_t>(current_pos); | ||
| ScopeGuard stream_guard([this, seek_pos]() -> void { | ||
| // reset input stream position to original position | ||
| Status status = input_stream_->Seek(current_pos, SeekOrigin::FS_SEEK_SET); | ||
| Status status = input_stream_->Seek(seek_pos, SeekOrigin::FS_SEEK_SET); | ||
| (void)status; |
| PAIMON_ASSIGN_OR_RAISE(uint64_t pos, out->GetPos()); | ||
| PAIMON_RETURN_NOT_OK(out->Close()); | ||
| guard.Release(); | ||
| return std::make_pair(PathUtil::GetName(file_path), pos); | ||
| return std::make_pair(PathUtil::GetName(file_path), static_cast<int64_t>(pos)); |
94e7101 to
89ec1c2
Compare
| PAIMON_ASSIGN_OR_RAISE(int64_t total_length, file->Length()); | ||
| if (PAIMON_UNLIKELY(blob_offset > total_length)) { | ||
| return Status::Invalid( | ||
| fmt::format("offset {} exceed total length {}", blob_offset, total_length)); | ||
| } |
| in_stream_->GetInput()->Read(reinterpret_cast<char*>(b + offset), length, position); | ||
| if (!read_result.ok()) { | ||
| throw Lucene::IOException( | ||
| LuceneUtils::StringToWstring(read_result.status().ToString())); | ||
| } | ||
| if (read_result.value() != length) { | ||
| if (static_cast<int32_t>(read_result.value()) != length) { | ||
| throw Lucene::IOException(L"actual read len and expect read len mismatch"); | ||
| } |
| PAIMON_RETURN_NOT_OK(input_stream->Seek(0, FS_SEEK_SET)); | ||
| PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, input_stream->Length()); | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t file_length, input_stream->Length()); | ||
| PAIMON_UNIQUE_PTR<Bytes> content = Bytes::AllocateBytes(file_length, pool.get()); | ||
| PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len, | ||
| input_stream->Read(content->data(), content->size())); | ||
| if (static_cast<uint32_t>(actual_read_len) != file_length) { | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t actual_read_len, | ||
| input_stream->Read(content->data(), file_length)); | ||
| if (actual_read_len != file_length) { |
| static Result<PAIMON_UNIQUE_PTR<Bytes>> ReadAsyncFully( | ||
| std::unique_ptr<InputStream> input_stream, const std::shared_ptr<MemoryPool>& pool) { | ||
| PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, input_stream->Length()); | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t file_length, input_stream->Length()); | ||
| PAIMON_UNIQUE_PTR<Bytes> content = Bytes::AllocateBytes(file_length, pool.get()); | ||
| PAIMON_RETURN_NOT_OK(ReadAsyncFully(std::move(input_stream), content->data())); | ||
| return content; |
| static Status ReadAsyncFully(std::unique_ptr<InputStream> input_stream, char* content) { | ||
| PAIMON_RETURN_NOT_OK(input_stream->Seek(0, FS_SEEK_SET)); | ||
| PAIMON_ASSIGN_OR_RAISE(uint64_t file_length, input_stream->Length()); | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t file_length, input_stream->Length()); | ||
|
|
||
| uint64_t read_offset = 0; | ||
| uint32_t read_len = std::min(file_length, kDefaultReadChunkSize); | ||
| int64_t read_offset = 0; | ||
| int64_t read_len = std::min(file_length, kDefaultReadChunkSize); | ||
| std::vector<std::future<Status>> futures; | ||
| futures.reserve(file_length / kDefaultReadChunkSize + 1); | ||
| while (read_len > 0) { |
| PAIMON_ASSIGN_OR_RAISE(int64_t length, in->Length()); | ||
| content->resize(static_cast<size_t>(length)); | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t read_length, in->Read(content->data(), length)); | ||
| if (read_length != length) { |
| #include <functional> | ||
| #include <memory> | ||
| #include <utility> | ||
|
|
| /// @param pool Memory pool to use for allocation. | ||
| /// @return Unique pointer to the newly allocated Bytes object. | ||
| static PAIMON_UNIQUE_PTR<Bytes> AllocateBytes(int32_t length, MemoryPool* pool); | ||
| static PAIMON_UNIQUE_PTR<Bytes> AllocateBytes(size_t length, MemoryPool* pool); |
| PAIMON_ASSIGN_OR_RAISE(int32_t read_length, in->Read(content->data(), length)); | ||
| if (read_length != static_cast<int32_t>(length)) { | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t length, in->Length()); | ||
| content->resize(static_cast<size_t>(length)); |
There was a problem hiding this comment.
check length not negative?
| int32_t avail = count_ - pos_; | ||
| if (avail <= 0) { | ||
| assert(avail == 0); | ||
| assert(pos_ <= count_); |
There was a problem hiding this comment.
check and return status?
|
|
||
| /// Allocate bytes in pool | ||
| static std::shared_ptr<Bytes> AllocateBytes(int32_t length, MemoryPool* pool); | ||
| static std::shared_ptr<Bytes> AllocateBytes(size_t length, MemoryPool* pool); |
| avail = count_ - pos_; | ||
| if (avail <= 0) { | ||
| if (avail == 0) { | ||
| return Status::Invalid(fmt::format( |
| return Status::Invalid( | ||
| fmt::format("ByteArrayInputStream assert boundary failed: need length {}, read offset " | ||
| "{}, exceed length {}", | ||
| size, offset, length_)); |
| if (pos + need_length > static_cast<int64_t>(length)) { | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t length, input_stream_->Length()); | ||
| if (pos > length || need_length > length - pos) { | ||
| return Status::Invalid( |
| PAIMON_RETURN_NOT_OK(ValidateValueNonNegative(offset, "offset")); | ||
| PAIMON_RETURN_NOT_OK(ValidateValueNonNegative(length, "length")); | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t total_length, wrapped->Length()); | ||
| return Create(wrapped, length, offset, total_length); |
There was a problem hiding this comment.
rm PAIMON_RETURN_NOT_OK(ValidateValueNonNegative(offset, "offset"));
| inner_position_ += size; | ||
| return wrapped_->Read(buffer, size); | ||
| PAIMON_ASSIGN_OR_RAISE(int64_t actual_read_len, wrapped_->Read(buffer, size)); | ||
| inner_position_ += actual_read_len; |
There was a problem hiding this comment.
check actual_read_len==size.
| ~MemorySegmentUtils() = delete; | ||
|
|
||
| /// Allocate bytes in pool | ||
| static std::shared_ptr<Bytes> AllocateBytes(int32_t length, MemoryPool* pool); |
| PAIMON_ASSIGN_OR_RAISE(int64_t current_pos, input_stream_->GetPos()); | ||
| ScopeGuard stream_guard([this, current_pos]() -> void { | ||
| auto seek_pos = current_pos; | ||
| ScopeGuard stream_guard([this, seek_pos]() -> void { |
| return Status::IOError( | ||
| fmt::format("write file '{}' fail at off {}, wrote zero bytes, ec: {}", path_, | ||
| off, std::strerror(errno))); | ||
| } |
There was a problem hiding this comment.
Why is there a special case here specifically?
| in_stream_->GetInput()->Read(reinterpret_cast<char*>(b + offset), length, position); | ||
| if (!read_result.ok()) { | ||
| throw Lucene::IOException( | ||
| LuceneUtils::StringToWstring(read_result.status().ToString())); | ||
| } | ||
| if (read_result.value() != length) { | ||
| if (static_cast<int32_t>(read_result.value()) != length) { | ||
| throw Lucene::IOException(L"actual read len and expect read len mismatch"); | ||
| } |
Purpose
Linked issue: N/A
This PR updates file system and stream IO size contracts from 32-bit sizes to
int64_t, includingRead,ReadAsync,Write,GetPos,Tell,Length, and related local/Jindo implementations.It also simplifies call sites that previously looped or added checks only to bridge 32-bit IO boundaries, while keeping validation where values still cross into narrower or external API types. Blob subrange streams now pass the known file length into
OffsetInputStream::CreatesoBlob::NewInputStreamdoes not trigger a duplicate length lookup throughOffsetInputStream.Tests
cmake --build build -j64ctest -j64withLSAN_OPTIONS=detect_leaks=0cmake --build build --target paimon-common-test -j64LSAN_OPTIONS=detect_leaks=0 ./build/debug/paimon-common-test --gtest_filter=BlobTest.*:OffsetInputStreamTest.*API and Format
This changes public and internal C++ IO API signatures in
include/paimon/fs/file_system.hand related stream interfaces to useint64_tfor sizes and positions.No storage format or protocol change is intended.
Documentation
No.
Generative AI tooling
Generated-by: OpenAI Codex