Skip to content

Commit 253b689

Browse files
authored
feat(storage): Update the write handle while performing appendable object upload (#15889)
* feat(storage): Update the write handle while performing appendable object upload * revert unwanted changes * fix the format * remove debug statement * updated the tests
1 parent 0f4b6db commit 253b689

12 files changed

Lines changed: 209 additions & 3 deletions

google/cloud/storage/async/writer_connection.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ class AsyncWriterConnection {
121121

122122
/// Return the request metadata.
123123
virtual RpcMetadata GetRequestMetadata() = 0;
124+
125+
/// Returns the latest write handle, if any.
126+
virtual absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
127+
const = 0;
124128
};
125129

126130
/**

google/cloud/storage/internal/async/partial_upload.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ void PartialUpload::OnWrite(std::size_t n, bool ok) {
7474
request_.clear_first_message();
7575
request_.clear_flush();
7676
request_.clear_finish_write();
77+
// The `write_object_spec`, `append_object_spec` and `upload_id` fields must
78+
// only be set on the first message of a Write() request. They are cleared by
79+
// `PartialUpload` after the first message is sent.
80+
request_.clear_write_object_spec();
81+
request_.clear_append_object_spec();
82+
request_.clear_upload_id();
7783
request_.set_write_offset(request_.write_offset() + n);
7884
if (!data_.empty()) return Write();
7985
result_.set_value(true);

google/cloud/storage/internal/async/partial_upload_test.cc

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,66 @@ TEST(PartialUpload, ErrorOnChecksums) {
515515
EXPECT_THAT(success, StatusIs(StatusCode::kInvalidArgument));
516516
}
517517

518+
TEST(PartialUpload, ClearsSpecAfterFirstChunk) {
519+
auto generator = google::cloud::internal::DefaultPRNG(std::random_device{}());
520+
auto const buffer = RandomData(generator, 2 * kExpectedChunkSize + 1024);
521+
522+
AsyncSequencer<bool> sequencer;
523+
auto rpc = std::make_unique<MockStream>();
524+
EXPECT_CALL(*rpc, Write)
525+
.WillOnce([&](Request const& request, grpc::WriteOptions wopt) {
526+
EXPECT_FALSE(wopt.is_last_message());
527+
EXPECT_EQ(request.write_offset(), 0);
528+
// The oneof field must be set on the first message.
529+
EXPECT_TRUE(request.has_append_object_spec());
530+
EXPECT_EQ(request.append_object_spec().object(), "test-object");
531+
return sequencer.PushBack("Write");
532+
})
533+
.WillOnce([&](Request const& request, grpc::WriteOptions wopt) {
534+
EXPECT_FALSE(wopt.is_last_message());
535+
EXPECT_EQ(request.write_offset(), kExpectedChunkSize);
536+
// These fields must be cleared after the first message.
537+
EXPECT_FALSE(request.has_write_object_spec());
538+
EXPECT_FALSE(request.has_append_object_spec());
539+
EXPECT_FALSE(request.has_upload_id());
540+
return sequencer.PushBack("Write");
541+
})
542+
.WillOnce([&](Request const& request, grpc::WriteOptions wopt) {
543+
EXPECT_TRUE(wopt.is_last_message());
544+
EXPECT_EQ(request.write_offset(), 2 * kExpectedChunkSize);
545+
// These fields must be cleared after the first message.
546+
EXPECT_FALSE(request.has_write_object_spec());
547+
EXPECT_FALSE(request.has_append_object_spec());
548+
EXPECT_FALSE(request.has_upload_id());
549+
return sequencer.PushBack("Write");
550+
});
551+
552+
auto hash = std::make_unique<storage::internal::Crc32cHashFunction>();
553+
Request request;
554+
request.mutable_append_object_spec()->set_object("test-object");
555+
556+
auto call = PartialUpload::Call(std::move(rpc), std::move(hash), request,
557+
absl::Cord(buffer), PartialUpload::kFinalize);
558+
559+
auto result = call->Start();
560+
561+
auto next = sequencer.PopFrontWithName();
562+
EXPECT_THAT(next.second, "Write");
563+
next.first.set_value(true);
564+
565+
next = sequencer.PopFrontWithName();
566+
EXPECT_THAT(next.second, "Write");
567+
next.first.set_value(true);
568+
569+
next = sequencer.PopFrontWithName();
570+
EXPECT_THAT(next.second, "Write");
571+
next.first.set_value(true);
572+
573+
ASSERT_TRUE(result.is_ready());
574+
auto success = result.get();
575+
EXPECT_THAT(success, IsOkAndHolds(true));
576+
}
577+
518578
} // namespace
519579
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
520580
} // namespace storage_internal

google/cloud/storage/internal/async/writer_connection_buffered.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,10 @@ class AsyncWriterConnectionBufferedState
9696
return UploadId(std::unique_lock<std::mutex>(mu_));
9797
}
9898

99+
absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle() const {
100+
return Impl(std::unique_lock<std::mutex>(mu_))->WriteHandle();
101+
}
102+
99103
absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
100104
const {
101105
return Impl(std::unique_lock<std::mutex>(mu_))->PersistedState();
@@ -618,6 +622,11 @@ class AsyncWriterConnectionBuffered
618622

619623
std::string UploadId() const override { return state_->UploadId(); }
620624

625+
absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
626+
const override {
627+
return state_->WriteHandle();
628+
}
629+
621630
absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
622631
const override {
623632
return state_->PersistedState();

google/cloud/storage/internal/async/writer_connection_finalized.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ std::string AsyncWriterConnectionFinalized::UploadId() const {
4040
return upload_id_;
4141
}
4242

43+
absl::optional<google::storage::v2::BidiWriteHandle>
44+
AsyncWriterConnectionFinalized::WriteHandle() const {
45+
return absl::nullopt;
46+
}
47+
4348
absl::variant<std::int64_t, google::storage::v2::Object>
4449
AsyncWriterConnectionFinalized::PersistedState() const {
4550
return object_;

google/cloud/storage/internal/async/writer_connection_finalized.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ class AsyncWriterConnectionFinalized
5252
void Cancel() override;
5353

5454
std::string UploadId() const override;
55+
absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
56+
const override;
5557
absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
5658
const override;
5759

google/cloud/storage/internal/async/writer_connection_impl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ AsyncWriterConnectionImpl::MakeRequest() {
172172
auto request = request_;
173173
if (first_request_) {
174174
first_request_ = false;
175-
if (latest_write_handle_.has_value()) {
175+
if (latest_write_handle_.has_value() && request.has_append_object_spec()) {
176176
*request.mutable_append_object_spec()->mutable_write_handle() =
177177
*latest_write_handle_;
178178
}

google/cloud/storage/internal/async/writer_connection_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ class AsyncWriterConnectionImpl
5454
void Cancel() override { return impl_->Cancel(); }
5555

5656
std::string UploadId() const override;
57+
absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
58+
const override {
59+
return latest_write_handle_;
60+
}
5761
absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
5862
const override;
5963

google/cloud/storage/internal/async/writer_connection_resumed.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ class AsyncWriterConnectionResumedState
103103
return UploadId(std::unique_lock<std::mutex>(mu_));
104104
}
105105

106+
absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle() const {
107+
std::unique_lock<std::mutex> lk(mu_);
108+
return latest_write_handle_;
109+
}
110+
106111
absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
107112
const {
108113
return Impl(std::unique_lock<std::mutex>(mu_))->PersistedState();
@@ -294,6 +299,11 @@ class AsyncWriterConnectionResumedState
294299
}
295300

296301
void OnQuery(std::unique_lock<std::mutex> lk, std::int64_t persisted_size) {
302+
auto handle = impl_->WriteHandle();
303+
if (handle) {
304+
latest_write_handle_ = *std::move(handle);
305+
}
306+
297307
if (persisted_size < buffer_offset_) {
298308
return SetError(
299309
std::move(lk),
@@ -683,6 +693,11 @@ class AsyncWriterConnectionResumed
683693

684694
std::string UploadId() const override { return state_->UploadId(); }
685695

696+
absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
697+
const override {
698+
return state_->WriteHandle();
699+
}
700+
686701
absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
687702
const override {
688703
return state_->PersistedState();

google/cloud/storage/internal/async/writer_connection_resumed_test.cc

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ TEST(WriterConnectionResumed, FlushEmpty) {
172172
auto mock = std::make_unique<MockAsyncWriterConnection>();
173173
EXPECT_CALL(*mock, PersistedState)
174174
.WillRepeatedly(Return(MakePersistedState(0)));
175+
EXPECT_CALL(*mock, WriteHandle).WillRepeatedly(Return(absl::nullopt));
175176
EXPECT_CALL(*mock, Flush).WillRepeatedly([&](auto const& p) {
176177
EXPECT_TRUE(p.payload().empty());
177178
return sequencer.PushBack("Flush").then([](auto) { return Status{}; });
@@ -209,18 +210,25 @@ TEST(WriteConnectionResumed, FlushNonEmpty) {
209210

210211
EXPECT_CALL(*mock, PersistedState)
211212
.WillRepeatedly(Return(MakePersistedState(0)));
213+
EXPECT_CALL(*mock, WriteHandle).WillRepeatedly(Return(absl::nullopt));
212214
EXPECT_CALL(*mock, Flush)
213215
.WillOnce([&](auto const& p) {
214216
EXPECT_EQ(p.payload(), payload.payload());
215-
return sequencer.PushBack("Flush").then([](auto) { return Status{}; });
217+
return sequencer.PushBack("Flush").then([](auto f) {
218+
if (!f.get()) return TransientError();
219+
return Status{};
220+
});
216221
})
217222
.WillOnce([&](auto const& p) {
218223
EXPECT_TRUE(p.payload().empty());
219224
return sequencer.PushBack("Flush").then([](auto) { return Status{}; });
220225
});
221226
EXPECT_CALL(*mock, Query).WillOnce([&]() {
222227
return sequencer.PushBack("Query").then(
223-
[](auto) -> StatusOr<std::int64_t> { return 1024; });
228+
[](auto f) -> StatusOr<std::int64_t> {
229+
if (!f.get()) return TransientError();
230+
return 1024;
231+
});
224232
});
225233

226234
MockFactory mock_factory;
@@ -396,6 +404,7 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) {
396404

397405
EXPECT_CALL(*mock, PersistedState)
398406
.WillRepeatedly(Return(MakePersistedState(0)));
407+
EXPECT_CALL(*mock, WriteHandle).WillRepeatedly(Return(absl::nullopt));
399408
EXPECT_CALL(*mock, Flush(_)).WillRepeatedly([&](auto) {
400409
return sequencer.PushBack("Flush").then([](auto) { return Status{}; });
401410
});
@@ -523,6 +532,90 @@ TEST(WriteConnectionResumed, WriteHandleAssignmentAfterResume) {
523532
}
524533
}
525534

535+
TEST(WriterConnectionResumed, OnQueryUpdatesWriteHandle) {
536+
AsyncSequencer<bool> sequencer;
537+
auto mock = std::make_unique<MockAsyncWriterConnection>();
538+
auto* mock_ptr = mock.get();
539+
540+
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
541+
google::storage::v2::BidiWriteObjectResponse first_response;
542+
first_response.mutable_write_handle()->set_handle("initial-handle");
543+
544+
EXPECT_CALL(*mock_ptr, PersistedState)
545+
.WillRepeatedly(Return(MakePersistedState(0)));
546+
547+
google::storage::v2::BidiWriteHandle new_handle;
548+
new_handle.set_handle("updated-handle");
549+
EXPECT_CALL(*mock_ptr, WriteHandle).WillRepeatedly(Return(new_handle));
550+
551+
auto const expected_payload = std::string(1024, 'A');
552+
553+
EXPECT_CALL(*mock_ptr, Flush(_))
554+
.WillOnce([&](auto const& p) {
555+
EXPECT_EQ(p.size(), expected_payload.size());
556+
return sequencer.PushBack("Flush").then([](auto f) {
557+
if (f.get()) return Status{};
558+
return TransientError();
559+
});
560+
})
561+
.WillOnce([&](auto const& p) {
562+
// Ghost flush (internal implementation detail)
563+
EXPECT_TRUE(p.payload().empty());
564+
return sequencer.PushBack("GhostFlush").then([](auto) {
565+
return Status{};
566+
});
567+
});
568+
569+
EXPECT_CALL(*mock_ptr, Query)
570+
.WillOnce([&]() {
571+
return sequencer.PushBack("Query").then(
572+
[](auto f) -> StatusOr<std::int64_t> {
573+
if (!f.get()) return TransientError();
574+
return 1024;
575+
});
576+
})
577+
.WillOnce([&]() {
578+
return sequencer.PushBack("GhostQuery")
579+
.then([](auto) -> StatusOr<std::int64_t> { return 1024; });
580+
});
581+
582+
MockFactory mock_factory;
583+
EXPECT_CALL(mock_factory, Call).Times(0);
584+
585+
auto connection = MakeWriterConnectionResumed(
586+
mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr,
587+
first_response, Options{});
588+
589+
auto current_handle = connection->WriteHandle();
590+
ASSERT_TRUE(current_handle.has_value());
591+
EXPECT_EQ(current_handle->handle(), "initial-handle");
592+
593+
auto flush =
594+
connection->Flush(storage_experimental::WritePayload(expected_payload));
595+
596+
auto next = sequencer.PopFrontWithName();
597+
EXPECT_EQ(next.second, "Flush");
598+
next.first.set_value(true);
599+
600+
next = sequencer.PopFrontWithName();
601+
EXPECT_EQ(next.second, "Query");
602+
next.first.set_value(true);
603+
604+
next = sequencer.PopFrontWithName();
605+
EXPECT_EQ(next.second, "GhostFlush");
606+
next.first.set_value(true);
607+
608+
next = sequencer.PopFrontWithName();
609+
EXPECT_EQ(next.second, "GhostQuery");
610+
next.first.set_value(true);
611+
612+
EXPECT_THAT(flush.get(), StatusIs(StatusCode::kOk));
613+
614+
current_handle = connection->WriteHandle();
615+
ASSERT_TRUE(current_handle.has_value());
616+
EXPECT_EQ(current_handle->handle(), "updated-handle");
617+
}
618+
526619
} // namespace
527620
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
528621
} // namespace storage_internal

0 commit comments

Comments
 (0)