Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion google/cloud/storage/async/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,28 @@ future<StatusOr<google::storage::v2::Object>> AsyncWriter::Finalize(
return Finalize(std::move(token), WritePayload{});
}

future<Status> AsyncWriter::Close() {
future<Status> AsyncWriter::Flush() {
if (!impl_) {
return make_ready_future(
internal::CancelledError("closed stream", GCP_ERROR_INFO()));
}

return impl_->Flush(WritePayload{}).then([impl = impl_](auto f) {
return f.get();
});
}

future<Status> AsyncWriter::Close() {
if (!impl_) {
return make_ready_future(
internal::CancelledError("closed stream", GCP_ERROR_INFO()));
}

return impl_->Flush(WritePayload{}).then([impl = std::move(impl_)](auto f) {
return f.get();
});
}

RpcMetadata AsyncWriter::GetRequestMetadata() const {
return impl_->GetRequestMetadata();
}
Expand Down
17 changes: 17 additions & 0 deletions google/cloud/storage/async/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,25 @@ class AsyncWriter {
future<StatusOr<google::storage::v2::Object>> Finalize(AsyncToken token,
WritePayload payload);

/**
* Flush any buffered data to the service.
*
* For buffered uploads, this forces any data in the buffer to be sent to the
* service. The returned future is satisfied when the service acknowledges
* the flush. Note that the service may not have persisted the data, it may
* only be in ephemeral storage. To query the amount of persisted data use
* `PersistedState()` after the flush completes.
*
* @note This is not a terminal operation. The `AsyncWriter` can be used for
* further `Write()` or `Finalize()` operations.
*/
future<Status> Flush();

/**
* Close the upload by flushing the remaining data in buffer.
*
* @warning This is a terminal operation. The `AsyncWriter` object is not
* usable after this call.
*/
future<Status> Close();

Expand Down
134 changes: 134 additions & 0 deletions google/cloud/storage/async/writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,140 @@ TEST(AsyncWriterTest, FinalizeEmpty) {
EXPECT_STATUS_OK(actual);
}

TEST(AsyncWriterTest, Flush) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
::testing::InSequence sequence;
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(Status{});
});

AsyncWriter writer(std::move(mock));
auto const actual = writer.Flush().get();
EXPECT_STATUS_OK(actual);
}

TEST(AsyncWriterTest, ErrorOnFlush) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(PermanentError());
});

AsyncWriter writer(std::move(mock));
auto const actual = writer.Flush().get();
EXPECT_THAT(actual, StatusIs(PermanentError().code()));
}

TEST(AsyncWriterTest, FlushOnDefaultConstructed) {
AsyncWriter writer;
auto const actual = writer.Flush().get();
EXPECT_THAT(actual, StatusIs(StatusCode::kCancelled, "closed stream"));
}

TEST(AsyncWriterTest, FlushThenWrite) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
auto* mock_ptr = mock.get();
::testing::InSequence sequence;
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(Status{});
});
EXPECT_CALL(*mock, Write).WillOnce([](auto const&) {
return make_ready_future(Status{});
});

AsyncWriter writer(std::move(mock));
ASSERT_STATUS_OK(writer.Flush().get());

auto token = storage_internal::MakeAsyncToken(mock_ptr);
auto const actual = writer.Write(std::move(token), {}).get();
EXPECT_STATUS_OK(actual);
}

TEST(AsyncWriterTest, MultipleFlushesAreQueuedAndSequential) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
::testing::InSequence sequence;
// Expect three flushes, each with empty payload, and simulate delayed
// completion.
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
// Simulate async completion for first flush
return make_ready_future(Status{});
});
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
// Simulate async completion for second flush
return make_ready_future(Status{});
});
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
// Simulate async completion for third flush
return make_ready_future(Status{});
});

AsyncWriter writer(std::move(mock));
// Issue three flushes in quick succession
auto f1 = writer.Flush();
auto f2 = writer.Flush();
auto f3 = writer.Flush();

// All futures should be satisfied in order
EXPECT_STATUS_OK(f1.get());
EXPECT_STATUS_OK(f2.get());
EXPECT_STATUS_OK(f3.get());
}

TEST(AsyncWriterTest, Close) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
::testing::InSequence sequence;
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(Status{});
});

AsyncWriter writer(std::move(mock));
auto const actual = writer.Close().get();
EXPECT_STATUS_OK(actual);
}

TEST(AsyncWriterTest, CloseOnDefaultConstructed) {
AsyncWriter writer;
auto const actual = writer.Close().get();
EXPECT_THAT(actual, StatusIs(StatusCode::kCancelled, "closed stream"));
}

TEST(AsyncWriterTest, CloseOnMovedWriter) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(Status{});
});
AsyncWriter writer(std::move(mock));
AsyncWriter moved(std::move(writer));
auto const actual = moved.Close().get();
EXPECT_STATUS_OK(actual);
}

TEST(AsyncWriterTest, ErrorOnWriteAfterClose) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
auto* mock_ptr = mock.get();
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(Status{});
});

AsyncWriter writer(std::move(mock));
ASSERT_STATUS_OK(writer.Close().get());

// Create a token for the (now invalid) writer.
auto token = storage_internal::MakeAsyncToken(mock_ptr);
auto const actual = writer.Write(std::move(token), {}).get();
EXPECT_THAT(actual, StatusIs(StatusCode::kCancelled));
}

TEST(AsyncWriterTest, ErrorOnClose) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
EXPECT_CALL(*mock, Flush(WritePayloadContents(IsEmpty()))).WillOnce([] {
return make_ready_future(PermanentError());
});

AsyncWriter writer(std::move(mock));
auto const actual = writer.Close().get();
EXPECT_THAT(actual, StatusIs(PermanentError().code()));
}

TEST(AsyncWriterTest, ErrorDuringWrite) {
auto mock = std::make_unique<MockAsyncWriterConnection>();
EXPECT_CALL(*mock, Write).WillOnce([] {
Expand Down
41 changes: 41 additions & 0 deletions google/cloud/storage/tests/async_client_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -905,6 +905,47 @@ TEST_F(AsyncClientIntegrationTest, ResumeFinalizedAppendableObjectUpload) {
IsProtoEqual(*metadata)));
}

TEST_F(AsyncClientIntegrationTest, ExplicitFlushAppendableObjectUpload) {
if (!UsingEmulator()) GTEST_SKIP();
auto async = AsyncClient(TestOptions());
auto client = MakeIntegrationTestClient(true, TestOptions());
auto object_name = MakeRandomObjectName();
// Create a small block to send over and over.
auto constexpr kBlockSize = static_cast<std::int64_t>(256 * 1024);
auto const block = MakeRandomData(kBlockSize);

auto create =
client.CreateBucket(bucket_name(), storage::BucketMetadata{}
.set_location("us-west4")
.set_storage_class("RAPID"));
if (!create && create.status().code() != StatusCode::kAlreadyExists) {
GTEST_FAIL() << "cannot create bucket: " << create.status();
}
auto w =
async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name)
.get();
ASSERT_STATUS_OK(w);
AsyncWriter writer;
AsyncToken token;
std::tie(writer, token) = *std::move(w);

auto p = writer.Write(std::move(token), WritePayload(block)).get();
ASSERT_STATUS_OK(p);
token = *std::move(p);

// Explicitly flush the data.
auto flush_status = writer.Flush().get();
EXPECT_STATUS_OK(flush_status);

auto metadata = writer.Finalize(std::move(token)).get();
ASSERT_STATUS_OK(metadata);
ScheduleForDelete(*metadata);

EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName());
EXPECT_EQ(metadata->name(), object_name);
EXPECT_EQ(metadata->size(), kBlockSize);
}

TEST_F(AsyncClientIntegrationTest, Open) {
if (!UsingEmulator()) GTEST_SKIP();
auto async = AsyncClient(TestOptions());
Expand Down
Loading