Skip to content

Commit e0b8022

Browse files
authored
impl(pubsub): add noop message batch to connection (#12869)
1 parent 6cb9140 commit e0b8022

5 files changed

Lines changed: 69 additions & 23 deletions

File tree

google/cloud/pubsub/internal/batching_publisher_connection.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2323
struct Batch {
2424
std::vector<promise<StatusOr<std::string>>> waiters;
2525
std::weak_ptr<BatchingPublisherConnection> weak;
26+
std::shared_ptr<MessageBatch> batch;
2627

2728
void operator()(future<StatusOr<google::pubsub::v1::PublishResponse>> f) {
2829
auto response = f.get();
@@ -41,6 +42,7 @@ struct Batch {
4142
for (auto& w : waiters) {
4243
w.set_value(std::move(*response->mutable_message_ids(idx++)));
4344
}
45+
batch->FlushCallback();
4446
}
4547

4648
void SatisfyAllWaiters(Status const& status) {
@@ -54,6 +56,7 @@ BatchingPublisherConnection::~BatchingPublisherConnection() {
5456

5557
future<StatusOr<std::string>> BatchingPublisherConnection::Publish(
5658
PublishParams p) {
59+
batch_->SaveMessage(p.message);
5760
auto const bytes = MessageSize(p.message);
5861
std::unique_lock<std::mutex> lk(mu_);
5962
do {
@@ -196,6 +199,9 @@ void BatchingPublisherConnection::FlushImpl(std::unique_lock<std::mutex> lk) {
196199

197200
batch.weak = shared_from_this();
198201
request.set_topic(topic_full_name_);
202+
203+
batch_->Flush();
204+
batch.batch = batch_;
199205
sink_->AsyncPublish(std::move(request)).then(std::move(batch));
200206
}
201207

google/cloud/pubsub/internal/batching_publisher_connection.h

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_BATCHING_PUBLISHER_CONNECTION_H
1717

1818
#include "google/cloud/pubsub/internal/batch_sink.h"
19+
#include "google/cloud/pubsub/internal/message_batch.h"
1920
#include "google/cloud/pubsub/publisher_connection.h"
2021
#include "google/cloud/pubsub/version.h"
2122
#include <chrono>
@@ -37,11 +38,12 @@ class BatchingPublisherConnection
3738

3839
static std::shared_ptr<BatchingPublisherConnection> Create(
3940
pubsub::Topic topic, Options opts, std::string ordering_key,
40-
std::shared_ptr<BatchSink> sink, CompletionQueue cq) {
41+
std::shared_ptr<BatchSink> sink, CompletionQueue cq,
42+
std::shared_ptr<MessageBatch> batch) {
4143
return std::shared_ptr<BatchingPublisherConnection>(
42-
new BatchingPublisherConnection(std::move(topic), std::move(opts),
43-
std::move(ordering_key),
44-
std::move(sink), std::move(cq)));
44+
new BatchingPublisherConnection(
45+
std::move(topic), std::move(opts), std::move(ordering_key),
46+
std::move(sink), std::move(cq), std::move(batch)));
4547
}
4648

4749
future<StatusOr<std::string>> Publish(PublishParams p) override;
@@ -54,13 +56,15 @@ class BatchingPublisherConnection
5456
explicit BatchingPublisherConnection(pubsub::Topic topic, Options opts,
5557
std::string ordering_key,
5658
std::shared_ptr<BatchSink> sink,
57-
CompletionQueue cq)
59+
CompletionQueue cq,
60+
std::shared_ptr<MessageBatch> batch)
5861
: topic_(std::move(topic)),
5962
topic_full_name_(topic_.FullName()),
6063
opts_(std::move(opts)),
6164
ordering_key_(std::move(ordering_key)),
6265
sink_(std::move(sink)),
63-
cq_(std::move(cq)) {}
66+
cq_(std::move(cq)),
67+
batch_(std::move(batch)) {}
6468

6569
void OnTimer();
6670
future<StatusOr<std::string>> CorkedError();
@@ -75,6 +79,7 @@ class BatchingPublisherConnection
7579
std::string const ordering_key_;
7680
std::shared_ptr<BatchSink> const sink_;
7781
CompletionQueue cq_;
82+
std::shared_ptr<MessageBatch> const batch_;
7883

7984
std::mutex mu_;
8085
std::vector<promise<StatusOr<std::string>>> waiters_;

google/cloud/pubsub/internal/batching_publisher_connection_test.cc

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "google/cloud/pubsub/internal/batching_publisher_connection.h"
1616
#include "google/cloud/pubsub/internal/defaults.h"
1717
#include "google/cloud/pubsub/testing/mock_batch_sink.h"
18+
#include "google/cloud/pubsub/testing/mock_message_batch.h"
1819
#include "google/cloud/future.h"
1920
#include "google/cloud/internal/random.h"
2021
#include "google/cloud/testing_util/async_sequencer.h"
@@ -40,6 +41,7 @@ using ::testing::AtLeast;
4041
using ::testing::Contains;
4142
using ::testing::ElementsAre;
4243
using ::testing::HasSubstr;
44+
using ::testing::NiceMock;
4345

4446
google::pubsub::v1::PublishResponse MakeResponse(
4547
google::pubsub::v1::PublishRequest const& request) {
@@ -62,6 +64,8 @@ std::vector<std::string> MessagesData(
6264

6365
TEST(BatchingPublisherConnectionTest, FastDestructor) {
6466
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
67+
auto mock_batch =
68+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
6569
pubsub::Topic const topic("test-project", "test-topic");
6670

6771
AsyncSequencer<void> async;
@@ -79,7 +83,7 @@ TEST(BatchingPublisherConnectionTest, FastDestructor) {
7983
Options{}
8084
.set<pubsub::MaxBatchMessagesOption>(4)
8185
.set<pubsub::MaxHoldTimeOption>(kMaxHoldTime)),
82-
ordering_key, mock, background.cq());
86+
ordering_key, mock, background.cq(), mock_batch);
8387

8488
// Publishing a message starts the batch timer.
8589
auto pending = publisher->Publish(
@@ -96,6 +100,8 @@ TEST(BatchingPublisherConnectionTest, FastDestructor) {
96100

97101
TEST(BatchingPublisherConnectionTest, DefaultMakesProgress) {
98102
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
103+
auto mock_batch =
104+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
99105
pubsub::Topic const topic("test-project", "test-topic");
100106

101107
AsyncSequencer<void> async;
@@ -126,7 +132,7 @@ TEST(BatchingPublisherConnectionTest, DefaultMakesProgress) {
126132
Options{}
127133
.set<pubsub::MaxBatchMessagesOption>(4)
128134
.set<pubsub::MaxHoldTimeOption>(kMaxHoldTime)),
129-
ordering_key, mock, background.cq());
135+
ordering_key, mock, background.cq(), mock_batch);
130136

131137
// We expect the responses to be satisfied in the context of the completion
132138
// queue threads. This is an important property, the processing of any
@@ -163,6 +169,8 @@ TEST(BatchingPublisherConnectionTest, DefaultMakesProgress) {
163169

164170
TEST(BatchingPublisherConnectionTest, BatchByMessageCount) {
165171
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
172+
auto mock_batch =
173+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
166174
pubsub::Topic const topic("test-project", "test-topic");
167175

168176
EXPECT_CALL(*mock, AsyncPublish)
@@ -190,7 +198,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageCount) {
190198
.set<pubsub::MaxBatchMessagesOption>(2)
191199
.set<pubsub::MaxBatchBytesOption>(kMaxBytes)
192200
.set<pubsub::MaxHoldTimeOption>(kMaxHoldTime)),
193-
ordering_key, mock, background.cq());
201+
ordering_key, mock, background.cq(), mock_batch);
194202
auto r0 =
195203
publisher
196204
->Publish({pubsub::MessageBuilder{}.SetData("test-data-0").Build()})
@@ -215,6 +223,8 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageCount) {
215223

216224
TEST(BatchingPublisherConnectionTest, BatchByMessageSize) {
217225
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
226+
auto mock_batch =
227+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
218228
pubsub::Topic const topic("test-project", "test-topic");
219229

220230
EXPECT_CALL(*mock, AsyncPublish)
@@ -245,7 +255,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSize) {
245255
.set<pubsub::MaxBatchMessagesOption>(4)
246256
.set<pubsub::MaxBatchBytesOption>(max_bytes)
247257
.set<pubsub::MaxHoldTimeOption>(kMaxHoldTime)),
248-
ordering_key, mock, background.cq());
258+
ordering_key, mock, background.cq(), mock_batch);
249259
auto r0 = publisher->Publish({m0}).then([](future<StatusOr<std::string>> f) {
250260
auto r = f.get();
251261
ASSERT_STATUS_OK(r);
@@ -263,6 +273,8 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSize) {
263273
}
264274

265275
TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) {
276+
auto mock_batch =
277+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
266278
pubsub::Topic const topic("test-project", "test-topic");
267279

268280
auto constexpr kSinglePayload = 128;
@@ -303,7 +315,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) {
303315
Options{}
304316
.set<pubsub::MaxBatchMessagesOption>(100)
305317
.set<pubsub::MaxBatchBytesOption>(kBatchLimit)),
306-
ordering_key, mock, cq);
318+
ordering_key, mock, cq, mock_batch);
307319
std::vector<future<Status>> results;
308320
for (int i = 0; i != 3; ++i) {
309321
results.push_back(
@@ -331,6 +343,8 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) {
331343
}
332344

333345
TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) {
346+
auto mock_batch =
347+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
334348
pubsub::Topic const topic("test-project", "test-topic");
335349

336350
auto constexpr kSinglePayload = 128;
@@ -381,7 +395,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) {
381395
Options{}
382396
.set<pubsub::MaxBatchMessagesOption>(100)
383397
.set<pubsub::MaxBatchBytesOption>(kBatchLimit)),
384-
ordering_key, mock, cq);
398+
ordering_key, mock, cq, mock_batch);
385399
std::vector<future<Status>> results;
386400
auto publish_single = [&] {
387401
results.push_back(
@@ -413,6 +427,8 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) {
413427
}
414428

415429
TEST(BatchingPublisherConnectionTest, BatchTorture) {
430+
auto mock_batch =
431+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
416432
pubsub::Topic const topic("test-project", "test-topic");
417433

418434
auto constexpr kMaxMessages = 20;
@@ -448,7 +464,7 @@ TEST(BatchingPublisherConnectionTest, BatchTorture) {
448464
Options{}
449465
.set<pubsub::MaxBatchMessagesOption>(kMaxMessages)
450466
.set<pubsub::MaxBatchBytesOption>(kMaxPayload)),
451-
ordering_key, mock, background.cq());
467+
ordering_key, mock, background.cq(), mock_batch);
452468

453469
auto worker = [&](int iterations) {
454470
auto gen = google::cloud::internal::DefaultPRNG(std::random_device{}());
@@ -477,6 +493,8 @@ TEST(BatchingPublisherConnectionTest, BatchTorture) {
477493

478494
TEST(BatchingPublisherConnectionTest, BatchByMaximumHoldTime) {
479495
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
496+
auto mock_batch =
497+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
480498
pubsub::Topic const topic("test-project", "test-topic");
481499

482500
EXPECT_CALL(*mock, AsyncPublish)
@@ -500,7 +518,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMaximumHoldTime) {
500518
Options{}
501519
.set<pubsub::MaxBatchMessagesOption>(4)
502520
.set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(5))),
503-
/*ordering_key=*/{}, mock, cq);
521+
/*ordering_key=*/{}, mock, cq, mock_batch);
504522
auto r0 =
505523
publisher
506524
->Publish({pubsub::MessageBuilder{}.SetData("test-data-0").Build()})
@@ -531,6 +549,8 @@ TEST(BatchingPublisherConnectionTest, BatchByMaximumHoldTime) {
531549

532550
TEST(BatchingPublisherConnectionTest, BatchByFlush) {
533551
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
552+
auto mock_batch =
553+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
534554
pubsub::Topic const topic("test-project", "test-topic");
535555

536556
EXPECT_CALL(*mock, AsyncPublish)
@@ -565,7 +585,7 @@ TEST(BatchingPublisherConnectionTest, BatchByFlush) {
565585
Options{}
566586
.set<pubsub::MaxBatchMessagesOption>(4)
567587
.set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(5))),
568-
ordering_key, mock, cq);
588+
ordering_key, mock, cq, mock_batch);
569589

570590
std::vector<future<void>> results;
571591
for (auto i : {0, 1}) {
@@ -608,6 +628,8 @@ TEST(BatchingPublisherConnectionTest, BatchByFlush) {
608628

609629
TEST(BatchingPublisherConnectionTest, HandleError) {
610630
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
631+
auto mock_batch =
632+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
611633
pubsub::Topic const topic("test-project", "test-topic");
612634

613635
auto const error_status = Status(StatusCode::kPermissionDenied, "uh-oh");
@@ -622,7 +644,7 @@ TEST(BatchingPublisherConnectionTest, HandleError) {
622644
auto publisher = BatchingPublisherConnection::Create(
623645
topic,
624646
DefaultPublisherOptions(Options{}.set<pubsub::MaxBatchMessagesOption>(2)),
625-
ordering_key, mock, bg.cq());
647+
ordering_key, mock, bg.cq(), mock_batch);
626648
auto r0 = publisher->Publish(
627649
{pubsub::MessageBuilder{}.SetData("test-data-0").Build()});
628650
auto r1 = publisher->Publish(
@@ -636,6 +658,8 @@ TEST(BatchingPublisherConnectionTest, HandleError) {
636658

637659
TEST(BatchingPublisherConnectionTest, HandleInvalidResponse) {
638660
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
661+
auto mock_batch =
662+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
639663
pubsub::Topic const topic("test-project", "test-topic");
640664

641665
EXPECT_CALL(*mock, AsyncPublish)
@@ -648,7 +672,7 @@ TEST(BatchingPublisherConnectionTest, HandleInvalidResponse) {
648672
auto publisher = BatchingPublisherConnection::Create(
649673
topic,
650674
DefaultPublisherOptions(Options{}.set<pubsub::MaxBatchMessagesOption>(2)),
651-
"test-ordering-key", mock, background.cq());
675+
"test-ordering-key", mock, background.cq(), mock_batch);
652676
auto r0 = publisher->Publish(
653677
{pubsub::MessageBuilder{}.SetData("test-data-0").Build()});
654678
auto r1 = publisher->Publish(
@@ -662,6 +686,8 @@ TEST(BatchingPublisherConnectionTest, HandleInvalidResponse) {
662686

663687
TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingPartialBatch) {
664688
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
689+
auto mock_batch =
690+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
665691
pubsub::Topic const topic("test-project", "test-topic");
666692

667693
auto const error_status = Status(StatusCode::kPermissionDenied, "uh-oh");
@@ -682,7 +708,7 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingPartialBatch) {
682708
topic,
683709
DefaultPublisherOptions(
684710
Options{}.set<pubsub::MaxBatchMessagesOption>(kBatchSize)),
685-
ordering_key, mock, cq);
711+
ordering_key, mock, cq, mock_batch);
686712
std::vector<future<StatusOr<std::string>>> results;
687713
// Create a full batch (by message count) and a partial batch.
688714
for (int i = 0; i != kBatchSize + kBatchSize / 2; ++i) {
@@ -710,6 +736,8 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingPartialBatch) {
710736

711737
TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingResume) {
712738
auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
739+
auto mock_batch =
740+
std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
713741
pubsub::Topic const topic("test-project", "test-topic");
714742
auto const ordering_key = std::string{"test-key"};
715743

@@ -743,7 +771,7 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingResume) {
743771
Options{}
744772
.set<pubsub::MaxBatchMessagesOption>(kBatchSize)
745773
.set<pubsub::MaxHoldTimeOption>(kMaxHoldTime)),
746-
ordering_key, mock, cq);
774+
ordering_key, mock, cq, mock_batch);
747775
std::vector<future<StatusOr<std::string>>> results;
748776
// Create a full batch (by size).
749777
for (int i = 0; i != kBatchSize; ++i) {

google/cloud/pubsub/internal/noop_message_batch.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class NoOpMessageBatch : public MessageBatch {
3131

3232
~NoOpMessageBatch() override = default;
3333

34-
void SaveMessage(pubsub::Message m) override{};
34+
void SaveMessage(pubsub::Message) override{};
3535

3636
void Flush() override{};
3737

google/cloud/pubsub/publisher_connection.cc

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
#include "google/cloud/pubsub/internal/defaults.h"
2222
#include "google/cloud/pubsub/internal/flow_controlled_publisher_connection.h"
2323
#include "google/cloud/pubsub/internal/flow_controlled_publisher_tracing_connection.h"
24+
#include "google/cloud/pubsub/internal/message_batch.h"
25+
#include "google/cloud/pubsub/internal/noop_message_batch.h"
2426
#include "google/cloud/pubsub/internal/ordering_key_publisher_connection.h"
2527
#include "google/cloud/pubsub/internal/publisher_stub_factory.h"
2628
#include "google/cloud/pubsub/internal/publisher_tracing_connection.h"
@@ -47,23 +49,28 @@ std::shared_ptr<pubsub::PublisherConnection> ConnectionFromDecoratedStub(
4749
auto cq = background->cq();
4850
std::shared_ptr<pubsub_internal::BatchSink> sink =
4951
pubsub_internal::DefaultBatchSink::Create(stub, cq, opts);
52+
std::shared_ptr<pubsub_internal::MessageBatch> message_batch =
53+
std::make_shared<pubsub_internal::NoOpMessageBatch>();
5054
if (opts.get<pubsub::MessageOrderingOption>()) {
51-
auto factory = [topic, opts, sink, cq](std::string const& key) {
55+
auto factory = [topic, opts, sink, cq,
56+
message_batch](std::string const& key) {
5257
auto used_sink = sink;
5358
if (!key.empty()) {
5459
// Only wrap the sink if there is an ordering key.
5560
used_sink = pubsub_internal::SequentialBatchSink::Create(
5661
std::move(used_sink));
5762
}
5863
return pubsub_internal::BatchingPublisherConnection::Create(
59-
topic, opts, key, std::move(used_sink), cq);
64+
topic, opts, key, std::move(used_sink), cq,
65+
std::move(message_batch));
6066
};
6167
return pubsub_internal::OrderingKeyPublisherConnection::Create(
6268
std::move(factory));
6369
}
6470
return pubsub_internal::RejectsWithOrderingKey::Create(
6571
pubsub_internal::BatchingPublisherConnection::Create(
66-
topic, opts, {}, std::move(sink), std::move(cq)));
72+
topic, opts, {}, std::move(sink), std::move(cq),
73+
std::move(message_batch)));
6774
};
6875
auto tracing_enabled = google::cloud::internal::TracingEnabled(opts);
6976
auto connection = make_connection();

0 commit comments

Comments
 (0)