Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions google/cloud/storage/async/writer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ class AsyncWriterConnection {

/// Return the request metadata.
virtual RpcMetadata GetRequestMetadata() = 0;

/// Returns the latest write handle, if any.
virtual absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
const = 0;
};

/**
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/storage/internal/async/partial_upload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ void PartialUpload::OnWrite(std::size_t n, bool ok) {
request_.clear_first_message();
request_.clear_flush();
request_.clear_finish_write();
// The `write_object_spec`, `append_object_spec` and `upload_id` fields must
// only be set on the first message of a Write() request. They are cleared by
// `PartialUpload` after the first message is sent.
request_.clear_write_object_spec();
request_.clear_append_object_spec();
request_.clear_upload_id();
request_.set_write_offset(request_.write_offset() + n);
if (!data_.empty()) return Write();
result_.set_value(true);
Expand Down
60 changes: 60 additions & 0 deletions google/cloud/storage/internal/async/partial_upload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,66 @@ TEST(PartialUpload, ErrorOnChecksums) {
EXPECT_THAT(success, StatusIs(StatusCode::kInvalidArgument));
}

TEST(PartialUpload, ClearsSpecAfterFirstChunk) {
auto generator = google::cloud::internal::DefaultPRNG(std::random_device{}());
auto const buffer = RandomData(generator, 2 * kExpectedChunkSize + 1024);

AsyncSequencer<bool> sequencer;
auto rpc = std::make_unique<MockStream>();
EXPECT_CALL(*rpc, Write)
.WillOnce([&](Request const& request, grpc::WriteOptions wopt) {
EXPECT_FALSE(wopt.is_last_message());
EXPECT_EQ(request.write_offset(), 0);
// The oneof field must be set on the first message.
EXPECT_TRUE(request.has_append_object_spec());
EXPECT_EQ(request.append_object_spec().object(), "test-object");
return sequencer.PushBack("Write");
})
.WillOnce([&](Request const& request, grpc::WriteOptions wopt) {
EXPECT_FALSE(wopt.is_last_message());
EXPECT_EQ(request.write_offset(), kExpectedChunkSize);
// These fields must be cleared after the first message.
EXPECT_FALSE(request.has_write_object_spec());
EXPECT_FALSE(request.has_append_object_spec());
EXPECT_FALSE(request.has_upload_id());
return sequencer.PushBack("Write");
})
.WillOnce([&](Request const& request, grpc::WriteOptions wopt) {
EXPECT_TRUE(wopt.is_last_message());
EXPECT_EQ(request.write_offset(), 2 * kExpectedChunkSize);
// These fields must be cleared after the first message.
EXPECT_FALSE(request.has_write_object_spec());
EXPECT_FALSE(request.has_append_object_spec());
EXPECT_FALSE(request.has_upload_id());
return sequencer.PushBack("Write");
});

auto hash = std::make_unique<storage::internal::Crc32cHashFunction>();
Request request;
request.mutable_append_object_spec()->set_object("test-object");

auto call = PartialUpload::Call(std::move(rpc), std::move(hash), request,
absl::Cord(buffer), PartialUpload::kFinalize);

auto result = call->Start();

auto next = sequencer.PopFrontWithName();
EXPECT_THAT(next.second, "Write");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_THAT(next.second, "Write");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_THAT(next.second, "Write");
next.first.set_value(true);

ASSERT_TRUE(result.is_ready());
auto success = result.get();
EXPECT_THAT(success, IsOkAndHolds(true));
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ class AsyncWriterConnectionBufferedState
return UploadId(std::unique_lock<std::mutex>(mu_));
}

absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle() const {
return Impl(std::unique_lock<std::mutex>(mu_))->WriteHandle();
}

absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
const {
return Impl(std::unique_lock<std::mutex>(mu_))->PersistedState();
Expand Down Expand Up @@ -618,6 +622,11 @@ class AsyncWriterConnectionBuffered

std::string UploadId() const override { return state_->UploadId(); }

absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
const override {
return state_->WriteHandle();
}

absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
const override {
return state_->PersistedState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ std::string AsyncWriterConnectionFinalized::UploadId() const {
return upload_id_;
}

absl::optional<google::storage::v2::BidiWriteHandle>
AsyncWriterConnectionFinalized::WriteHandle() const {
return absl::nullopt;
}

absl::variant<std::int64_t, google::storage::v2::Object>
AsyncWriterConnectionFinalized::PersistedState() const {
return object_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class AsyncWriterConnectionFinalized
void Cancel() override;

std::string UploadId() const override;
absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
const override;
absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
const override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ AsyncWriterConnectionImpl::MakeRequest() {
auto request = request_;
if (first_request_) {
first_request_ = false;
if (latest_write_handle_.has_value()) {
if (latest_write_handle_.has_value() && request.has_append_object_spec()) {
*request.mutable_append_object_spec()->mutable_write_handle() =
*latest_write_handle_;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class AsyncWriterConnectionImpl
void Cancel() override { return impl_->Cancel(); }

std::string UploadId() const override;
absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
const override {
return latest_write_handle_;
}
absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
const override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class AsyncWriterConnectionResumedState
return UploadId(std::unique_lock<std::mutex>(mu_));
}

absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle() const {
std::unique_lock<std::mutex> lk(mu_);
return latest_write_handle_;
}

absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
const {
return Impl(std::unique_lock<std::mutex>(mu_))->PersistedState();
Expand Down Expand Up @@ -294,6 +299,11 @@ class AsyncWriterConnectionResumedState
}

void OnQuery(std::unique_lock<std::mutex> lk, std::int64_t persisted_size) {
auto handle = impl_->WriteHandle();
if (handle) {
latest_write_handle_ = *std::move(handle);
}

if (persisted_size < buffer_offset_) {
return SetError(
std::move(lk),
Expand Down Expand Up @@ -683,6 +693,11 @@ class AsyncWriterConnectionResumed

std::string UploadId() const override { return state_->UploadId(); }

absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
const override {
return state_->WriteHandle();
}

absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
const override {
return state_->PersistedState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ TEST(WriterConnectionResumed, FlushEmpty) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
EXPECT_CALL(*mock, PersistedState)
.WillRepeatedly(Return(MakePersistedState(0)));
EXPECT_CALL(*mock, WriteHandle).WillRepeatedly(Return(absl::nullopt));
EXPECT_CALL(*mock, Flush).WillRepeatedly([&](auto const& p) {
EXPECT_TRUE(p.payload().empty());
return sequencer.PushBack("Flush").then([](auto) { return Status{}; });
Expand Down Expand Up @@ -209,18 +210,25 @@ TEST(WriteConnectionResumed, FlushNonEmpty) {

EXPECT_CALL(*mock, PersistedState)
.WillRepeatedly(Return(MakePersistedState(0)));
EXPECT_CALL(*mock, WriteHandle).WillRepeatedly(Return(absl::nullopt));
EXPECT_CALL(*mock, Flush)
.WillOnce([&](auto const& p) {
EXPECT_EQ(p.payload(), payload.payload());
return sequencer.PushBack("Flush").then([](auto) { return Status{}; });
return sequencer.PushBack("Flush").then([](auto f) {
if (!f.get()) return TransientError();
return Status{};
});
})
.WillOnce([&](auto const& p) {
EXPECT_TRUE(p.payload().empty());
return sequencer.PushBack("Flush").then([](auto) { return Status{}; });
});
EXPECT_CALL(*mock, Query).WillOnce([&]() {
return sequencer.PushBack("Query").then(
[](auto) -> StatusOr<std::int64_t> { return 1024; });
[](auto f) -> StatusOr<std::int64_t> {
if (!f.get()) return TransientError();
return 1024;
});
});

MockFactory mock_factory;
Expand Down Expand Up @@ -396,6 +404,7 @@ TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) {

EXPECT_CALL(*mock, PersistedState)
.WillRepeatedly(Return(MakePersistedState(0)));
EXPECT_CALL(*mock, WriteHandle).WillRepeatedly(Return(absl::nullopt));
EXPECT_CALL(*mock, Flush(_)).WillRepeatedly([&](auto) {
return sequencer.PushBack("Flush").then([](auto) { return Status{}; });
});
Expand Down Expand Up @@ -523,6 +532,90 @@ TEST(WriteConnectionResumed, WriteHandleAssignmentAfterResume) {
}
}

TEST(WriterConnectionResumed, OnQueryUpdatesWriteHandle) {
AsyncSequencer<bool> sequencer;
auto mock = std::make_unique<MockAsyncWriterConnection>();
auto* mock_ptr = mock.get();

auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
google::storage::v2::BidiWriteObjectResponse first_response;
first_response.mutable_write_handle()->set_handle("initial-handle");

EXPECT_CALL(*mock_ptr, PersistedState)
.WillRepeatedly(Return(MakePersistedState(0)));

google::storage::v2::BidiWriteHandle new_handle;
new_handle.set_handle("updated-handle");
EXPECT_CALL(*mock_ptr, WriteHandle).WillRepeatedly(Return(new_handle));

auto const expected_payload = std::string(1024, 'A');

EXPECT_CALL(*mock_ptr, Flush(_))
.WillOnce([&](auto const& p) {
EXPECT_EQ(p.size(), expected_payload.size());
return sequencer.PushBack("Flush").then([](auto f) {
if (f.get()) return Status{};
return TransientError();
});
})
.WillOnce([&](auto const& p) {
// Ghost flush (internal implementation detail)
EXPECT_TRUE(p.payload().empty());
return sequencer.PushBack("GhostFlush").then([](auto) {
return Status{};
});
});

EXPECT_CALL(*mock_ptr, Query)
.WillOnce([&]() {
return sequencer.PushBack("Query").then(
[](auto f) -> StatusOr<std::int64_t> {
if (!f.get()) return TransientError();
return 1024;
});
})
.WillOnce([&]() {
return sequencer.PushBack("GhostQuery")
.then([](auto) -> StatusOr<std::int64_t> { return 1024; });
});

MockFactory mock_factory;
EXPECT_CALL(mock_factory, Call).Times(0);

auto connection = MakeWriterConnectionResumed(
mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr,
first_response, Options{});

auto current_handle = connection->WriteHandle();
ASSERT_TRUE(current_handle.has_value());
EXPECT_EQ(current_handle->handle(), "initial-handle");

auto flush =
connection->Flush(storage_experimental::WritePayload(expected_payload));

auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Flush");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Query");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "GhostFlush");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "GhostQuery");
next.first.set_value(true);

EXPECT_THAT(flush.get(), StatusIs(StatusCode::kOk));

current_handle = connection->WriteHandle();
ASSERT_TRUE(current_handle.has_value());
EXPECT_EQ(current_handle->handle(), "updated-handle");
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class AsyncWriterConnectionTracing
return impl_->UploadId();
}

absl::optional<google::storage::v2::BidiWriteHandle> WriteHandle()
const override {
// No tracing, this is a local call without any significant work.
return impl_->WriteHandle();
}

absl::variant<std::int64_t, google::storage::v2::Object> PersistedState()
const override {
// No tracing, this is a local call without any significant work.
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/storage/mocks/mock_async_writer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class MockAsyncWriterConnection
public:
MOCK_METHOD(void, Cancel, (), (override));
MOCK_METHOD(std::string, UploadId, (), (const, override));
MOCK_METHOD(absl::optional<google::storage::v2::BidiWriteHandle>, WriteHandle,
(), (const, override));
MOCK_METHOD((absl::variant<std::int64_t, google::storage::v2::Object>),
PersistedState, (), (const, override));
MOCK_METHOD(future<Status>, Write, (storage_experimental::WritePayload),
Expand Down