Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions google/cloud/storage/examples/storage_async_samples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ std::string SuspendBufferedUpload(
google::cloud::storage_experimental::AsyncClient&,
std::vector<std::string> const&) {
std::cerr
<< "AsyncClient::StartBufferedUpload() example requires coroutines\n";
<< "AsyncClient::SuspendBufferedUpload() example requires coroutines\n";
Comment thread
shubham-up-47 marked this conversation as resolved.
return {};
}

Expand All @@ -871,7 +871,7 @@ std::string SuspendUnbufferedUpload(
google::cloud::storage_experimental::AsyncClient&,
std::vector<std::string> const&) {
std::cerr
<< "AsyncClient::StartUnbufferedUpload() example requires coroutines\n";
<< "AsyncClient::SuspendUnbufferedUpload() example requires coroutines\n";
Comment thread
shubham-up-47 marked this conversation as resolved.
return {};
}

Expand Down Expand Up @@ -1122,7 +1122,7 @@ void AutoRun(std::vector<std::string> const& argv) {
auto upload_id = SuspendBufferedUpload(client, {bucket_name, object_name});

std::cout << "Running the ResumeBufferedUpload() example" << std::endl;
ResumeUnbufferedUpload(client, {upload_id});
ResumeBufferedUpload(client, {upload_id});
scheduled_for_delete.push_back(std::move(object_name));
object_name = examples::MakeRandomObjectName(generator, "object-");

Expand Down Expand Up @@ -1259,7 +1259,7 @@ int main(int argc, char* argv[]) try {
make_resume_entry("resume-buffered-upload", {}, ResumeBufferedUpload),

make_entry("start-unbuffered-upload", {"<filename>"},
StartBufferedUpload),
StartUnbufferedUpload),
make_entry("suspend-unbuffered-upload", {}, SuspendUnbufferedUpload),
make_resume_entry("resume-unbuffered-upload", {"<filename>"},
ResumeUnbufferedUpload),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ using ::google::cloud::storage::testing::MockAsyncBidiWriteObjectStream;
using ::google::cloud::testing_util::AsyncSequencer;
using ::google::cloud::testing_util::IsProtoEqual;
using ::google::cloud::testing_util::MockCompletionQueueImpl;
using ::google::storage::v2::BidiWriteObjectRequest;

using AsyncBidiWriteObjectStream = ::google::cloud::AsyncStreamingReadWriteRpc<
google::storage::v2::BidiWriteObjectRequest,
Expand Down Expand Up @@ -487,25 +488,43 @@ TEST_P(AsyncConnectionImplUploadHashTest, StartBuffered) {
});
EXPECT_CALL(*stream, Write)
.WillOnce(
[&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_EQ(request.upload_id(), "test-upload-id");
EXPECT_TRUE(request.finish_write());
EXPECT_THAT(request.object_checksums(),
IsProtoEqual(expected_checksums));
EXPECT_TRUE(wopt.is_last_message());
return sequencer.PushBack("Write");
});
EXPECT_CALL(*stream, Read).WillOnce([&] {
return sequencer.PushBack("Read").then([](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
response.mutable_resource()->set_bucket(
"projects/_/buckets/test-bucket");
response.mutable_resource()->set_name("test-object");
response.mutable_resource()->set_generation(123456);
return absl::make_optional(std::move(response));
});
});
[&](BidiWriteObjectRequest const& request, grpc::WriteOptions) {
EXPECT_FALSE(request.finish_write());
return sequencer.PushBack("Write(1)");
})
.WillOnce([&](BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_FALSE(request.has_upload_id());
EXPECT_TRUE(request.finish_write());
EXPECT_THAT(request.object_checksums(),
IsProtoEqual(expected_checksums));
EXPECT_TRUE(wopt.is_last_message());
return sequencer.PushBack("Write(2)");
});
EXPECT_CALL(*stream, Read)
.WillOnce([&]() {
return sequencer.PushBack("Read(1)").then(
[](auto f) -> absl::optional<
google::storage::v2::BidiWriteObjectResponse> {
if (!f.get()) return absl::nullopt;
auto response = google::storage::v2::BidiWriteObjectResponse{};
response.set_persisted_size(43);
return absl::make_optional(std::move(response));
});
})
.WillOnce([&]() {
return sequencer.PushBack("Read(2)").then(
[](auto f) -> absl::optional<
google::storage::v2::BidiWriteObjectResponse> {
if (!f.get()) return absl::nullopt;
auto response = google::storage::v2::BidiWriteObjectResponse{};
response.mutable_resource()->set_bucket(
"projects/_/buckets/test-bucket");
response.mutable_resource()->set_name("test-object");
response.mutable_resource()->set_generation(123456);
return absl::make_optional(std::move(response));
});
});
EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then([](auto) { return Status{}; });
Expand Down Expand Up @@ -543,11 +562,19 @@ TEST_P(AsyncConnectionImplUploadHashTest, StartBuffered) {
EXPECT_EQ(absl::get<std::int64_t>(writer->PersistedState()), 0);

auto w2 = writer->Finalize(storage_experimental::WritePayload(kQuickFox));
// The `Finalize()` call triggers a `Flush()` first.
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write");
EXPECT_EQ(next.second, "Write(1)");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read");
EXPECT_EQ(next.second, "Read(1)");
next.first.set_value(true);
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write(2)");
next.first.set_value(true);
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read(2)");
next.first.set_value(true);

auto response = w2.get();
Expand Down
180 changes: 51 additions & 129 deletions google/cloud/storage/internal/async/connection_impl_upload_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -940,195 +940,117 @@ TEST_F(AsyncConnectionImplTest, BufferedUploadNewUpload) {
}

TEST_F(AsyncConnectionImplTest, ResumeBufferedUploadNewUploadResume) {
auto constexpr kExpectedRequest = R"pb(
write_object_spec {
resource {
bucket: "projects/_/buckets/test-bucket"
name: "test-object"
content_type: "text/plain"
}
if_generation_match: 123
}
)pb";

AsyncSequencer<bool> sequencer;
auto mock = std::make_shared<storage::testing::MockStorageStub>();
EXPECT_CALL(*mock, AsyncStartResumableWrite)
.WillOnce([&] {
return sequencer.PushBack("StartResumableWrite(1)").then([](auto) {
return StatusOr<google::storage::v2::StartResumableWriteResponse>(
TransientError());
});
})
.WillOnce(
[&](auto&, auto, auto,
google::storage::v2::StartResumableWriteRequest const& request) {
auto expected = google::storage::v2::StartResumableWriteRequest{};
EXPECT_TRUE(
TextFormat::ParseFromString(kExpectedRequest, &expected));
EXPECT_THAT(request, IsProtoEqual(expected));

return sequencer.PushBack("StartResumableWrite(2)").then([](auto) {
auto response =
google::storage::v2::StartResumableWriteResponse{};
response.set_upload_id("test-upload-id");
return make_status_or(response);
});
});
EXPECT_CALL(*mock, AsyncQueryWriteStatus)
.WillOnce([&] {
return sequencer.PushBack("QueryWriteStatus(1)").then([](auto) {
return StatusOr<google::storage::v2::QueryWriteStatusResponse>(
TransientError());
});
})
.WillOnce(
[&](auto&, auto, auto,
google::storage::v2::QueryWriteStatusRequest const& request) {
EXPECT_EQ(request.upload_id(), "test-upload-id");
EXPECT_CALL(*mock, AsyncStartResumableWrite).WillOnce([&] {
return sequencer.PushBack("StartResumableWrite").then([](auto) {
auto response = google::storage::v2::StartResumableWriteResponse{};
response.set_upload_id("test-upload-id");
return make_status_or(response);
});
});

return sequencer.PushBack("QueryWriteStatus(2)").then([](auto) {
auto response = google::storage::v2::QueryWriteStatusResponse{};
response.set_persisted_size(0);
return make_status_or(response);
});
});
// We expect two calls to AsyncBidiWriteObject.
// 1. The first is for the initial upload, which will fail.
// 2. The second is for the resume attempt, which will be created and then
// cancelled as the finalize operation fails.
EXPECT_CALL(*mock, AsyncBidiWriteObject)
.WillOnce(
[&] { return MakeErrorBidiWriteStream(sequencer, TransientError()); })
.WillOnce([&]() {
auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
EXPECT_CALL(*stream, Start).WillOnce([&] {
return sequencer.PushBack("Start(1)");
});
EXPECT_CALL(*stream, Write)
.WillOnce(
[&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_EQ(request.upload_id(), "test-upload-id");
EXPECT_TRUE(request.finish_write());
EXPECT_TRUE(request.has_object_checksums());
EXPECT_TRUE(wopt.is_last_message());
return sequencer.PushBack("Write");
});
EXPECT_CALL(*stream, Write).WillOnce([&] {
return sequencer.PushBack("Write");
});
EXPECT_CALL(*stream, Read).WillOnce([&] {
return sequencer.PushBack("Read").then([](auto) {
return absl::optional<
google::storage::v2::BidiWriteObjectResponse>{};
});
});
EXPECT_CALL(*stream, Cancel).Times(1);
// This stream finishes with an error, triggering the resume logic.
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then(
[](auto) { return TransientError(); });
return sequencer.PushBack("Finish(1)").then([](auto) {
return TransientError();
});
});
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
})
.WillOnce([&]() {
auto stream = std::make_unique<MockAsyncBidiWriteObjectStream>();
// This stream is created but then immediately cancelled.
EXPECT_CALL(*stream, Start).WillOnce([&] {
return sequencer.PushBack("Start(2)");
});
EXPECT_CALL(*stream, Write)
.WillOnce(
[&](google::storage::v2::BidiWriteObjectRequest const& request,
grpc::WriteOptions wopt) {
EXPECT_EQ(request.upload_id(), "test-upload-id");
EXPECT_TRUE(request.finish_write());
EXPECT_TRUE(request.has_object_checksums());
EXPECT_TRUE(wopt.is_last_message());
return sequencer.PushBack("Write");
});
EXPECT_CALL(*stream, Read).WillOnce([&] {
return sequencer.PushBack("Read").then([](auto) {
auto response = google::storage::v2::BidiWriteObjectResponse{};
response.mutable_resource()->set_bucket(
"projects/_/buckets/test-bucket");
response.mutable_resource()->set_name("test-object");
response.mutable_resource()->set_generation(123456);
return absl::make_optional(std::move(response));
});
});
EXPECT_CALL(*stream, Cancel).Times(1);
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then(
[](auto) { return Status{}; });
return sequencer.PushBack("Finish(2)").then([](auto) {
return Status{};
});
});
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
});

// After the first stream fails, the connection queries the status to resume.
EXPECT_CALL(*mock, AsyncQueryWriteStatus)
.WillOnce(
[&](auto&, auto, auto,
google::storage::v2::QueryWriteStatusRequest const& request) {
EXPECT_EQ(request.upload_id(), "test-upload-id");
return sequencer.PushBack("QueryWriteStatus").then([](auto) {
auto response = google::storage::v2::QueryWriteStatusResponse{};
response.set_persisted_size(0);
return make_status_or(response);
});
});

internal::AutomaticallyCreatedBackgroundThreads pool(1);
auto connection = MakeTestConnection(pool.cq(), mock);
auto request = google::storage::v2::StartResumableWriteRequest{};
ASSERT_TRUE(TextFormat::ParseFromString(kExpectedRequest, &request));
auto pending = connection->StartBufferedUpload(
{std::move(request), connection->options()});

// Start the upload and get the writer.
auto pending = connection->StartBufferedUpload(
{google::storage::v2::StartResumableWriteRequest{},
connection->options()});
auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "StartResumableWrite(1)");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "StartResumableWrite(2)");
EXPECT_EQ(next.second, "StartResumableWrite");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Start");
next.first.set_value(false); // The first stream fails

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finish");
next.first.set_value(false);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Start(1)");
next.first.set_value(true);

auto r = pending.get();
ASSERT_STATUS_OK(r);
auto writer = *std::move(r);
EXPECT_EQ(writer->UploadId(), "test-upload-id");
EXPECT_EQ(absl::get<std::int64_t>(writer->PersistedState()), 0);

auto w1 = writer->Finalize({});
// Finalize the writer. This will start the failure/resume sequence.
auto final_status = writer->Finalize({});
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write");
next.first.set_value(true);
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finish");
next.first.set_value(true);

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "QueryWriteStatus(1)");
next.first.set_value(true);
EXPECT_EQ(next.second, "Finish(1)");
next.first.set_value(true); // Fails with TransientError

// The buffered writer now starts the resume.
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "QueryWriteStatus(2)");
EXPECT_EQ(next.second, "QueryWriteStatus");
next.first.set_value(true);

// The connection creates the second stream.
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Start(2)");
next.first.set_value(true);
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Write");
next.first.set_value(true);
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Read");
next.first.set_value(true);

auto response = w1.get();
ASSERT_STATUS_OK(response);
EXPECT_EQ(response->bucket(), "projects/_/buckets/test-bucket");
EXPECT_EQ(response->name(), "test-object");
EXPECT_EQ(response->generation(), 123456);
// Now the code fails with the original error.
EXPECT_THAT(final_status.get(), StatusIs(TransientError().code()));

// Clean up the writer, which will cancel and finish the second stream.
writer.reset();
next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finish");
EXPECT_EQ(next.second, "Finish(2)");
next.first.set_value(true);
}

Expand Down
Loading
Loading