1515#include " google/cloud/pubsub/internal/batching_publisher_connection.h"
1616#include " google/cloud/pubsub/internal/defaults.h"
1717#include " google/cloud/pubsub/testing/mock_batch_sink.h"
18+ #include " google/cloud/pubsub/testing/mock_message_batch.h"
1819#include " google/cloud/future.h"
1920#include " google/cloud/internal/random.h"
2021#include " google/cloud/testing_util/async_sequencer.h"
@@ -40,6 +41,7 @@ using ::testing::AtLeast;
4041using ::testing::Contains;
4142using ::testing::ElementsAre;
4243using ::testing::HasSubstr;
44+ using ::testing::NiceMock;
4345
4446google::pubsub::v1::PublishResponse MakeResponse (
4547 google::pubsub::v1::PublishRequest const & request) {
@@ -62,6 +64,8 @@ std::vector<std::string> MessagesData(
6264
6365TEST (BatchingPublisherConnectionTest, FastDestructor) {
6466 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
67+ auto mock_batch =
68+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
6569 pubsub::Topic const topic (" test-project" , " test-topic" );
6670
6771 AsyncSequencer<void > async;
@@ -79,7 +83,7 @@ TEST(BatchingPublisherConnectionTest, FastDestructor) {
7983 Options{}
8084 .set <pubsub::MaxBatchMessagesOption>(4 )
8185 .set <pubsub::MaxHoldTimeOption>(kMaxHoldTime )),
82- ordering_key, mock, background.cq ());
86+ ordering_key, mock, background.cq (), mock_batch );
8387
8488 // Publishing a message starts the batch timer.
8589 auto pending = publisher->Publish (
@@ -96,6 +100,8 @@ TEST(BatchingPublisherConnectionTest, FastDestructor) {
96100
97101TEST (BatchingPublisherConnectionTest, DefaultMakesProgress) {
98102 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
103+ auto mock_batch =
104+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
99105 pubsub::Topic const topic (" test-project" , " test-topic" );
100106
101107 AsyncSequencer<void > async;
@@ -126,7 +132,7 @@ TEST(BatchingPublisherConnectionTest, DefaultMakesProgress) {
126132 Options{}
127133 .set <pubsub::MaxBatchMessagesOption>(4 )
128134 .set <pubsub::MaxHoldTimeOption>(kMaxHoldTime )),
129- ordering_key, mock, background.cq ());
135+ ordering_key, mock, background.cq (), mock_batch );
130136
131137 // We expect the responses to be satisfied in the context of the completion
132138 // queue threads. This is an important property, the processing of any
@@ -163,6 +169,8 @@ TEST(BatchingPublisherConnectionTest, DefaultMakesProgress) {
163169
164170TEST (BatchingPublisherConnectionTest, BatchByMessageCount) {
165171 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
172+ auto mock_batch =
173+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
166174 pubsub::Topic const topic (" test-project" , " test-topic" );
167175
168176 EXPECT_CALL (*mock, AsyncPublish)
@@ -190,7 +198,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageCount) {
190198 .set <pubsub::MaxBatchMessagesOption>(2 )
191199 .set <pubsub::MaxBatchBytesOption>(kMaxBytes )
192200 .set <pubsub::MaxHoldTimeOption>(kMaxHoldTime )),
193- ordering_key, mock, background.cq ());
201+ ordering_key, mock, background.cq (), mock_batch );
194202 auto r0 =
195203 publisher
196204 ->Publish ({pubsub::MessageBuilder{}.SetData (" test-data-0" ).Build ()})
@@ -215,6 +223,8 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageCount) {
215223
216224TEST (BatchingPublisherConnectionTest, BatchByMessageSize) {
217225 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
226+ auto mock_batch =
227+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
218228 pubsub::Topic const topic (" test-project" , " test-topic" );
219229
220230 EXPECT_CALL (*mock, AsyncPublish)
@@ -245,7 +255,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSize) {
245255 .set <pubsub::MaxBatchMessagesOption>(4 )
246256 .set <pubsub::MaxBatchBytesOption>(max_bytes)
247257 .set <pubsub::MaxHoldTimeOption>(kMaxHoldTime )),
248- ordering_key, mock, background.cq ());
258+ ordering_key, mock, background.cq (), mock_batch );
249259 auto r0 = publisher->Publish ({m0}).then ([](future<StatusOr<std::string>> f) {
250260 auto r = f.get ();
251261 ASSERT_STATUS_OK (r);
@@ -263,6 +273,8 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSize) {
263273}
264274
265275TEST (BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) {
276+ auto mock_batch =
277+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
266278 pubsub::Topic const topic (" test-project" , " test-topic" );
267279
268280 auto constexpr kSinglePayload = 128 ;
@@ -303,7 +315,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) {
303315 Options{}
304316 .set <pubsub::MaxBatchMessagesOption>(100 )
305317 .set <pubsub::MaxBatchBytesOption>(kBatchLimit )),
306- ordering_key, mock, cq);
318+ ordering_key, mock, cq, mock_batch );
307319 std::vector<future<Status>> results;
308320 for (int i = 0 ; i != 3 ; ++i) {
309321 results.push_back (
@@ -331,6 +343,8 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) {
331343}
332344
333345TEST (BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) {
346+ auto mock_batch =
347+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
334348 pubsub::Topic const topic (" test-project" , " test-topic" );
335349
336350 auto constexpr kSinglePayload = 128 ;
@@ -381,7 +395,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) {
381395 Options{}
382396 .set <pubsub::MaxBatchMessagesOption>(100 )
383397 .set <pubsub::MaxBatchBytesOption>(kBatchLimit )),
384- ordering_key, mock, cq);
398+ ordering_key, mock, cq, mock_batch );
385399 std::vector<future<Status>> results;
386400 auto publish_single = [&] {
387401 results.push_back (
@@ -413,6 +427,8 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) {
413427}
414428
415429TEST (BatchingPublisherConnectionTest, BatchTorture) {
430+ auto mock_batch =
431+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
416432 pubsub::Topic const topic (" test-project" , " test-topic" );
417433
418434 auto constexpr kMaxMessages = 20 ;
@@ -448,7 +464,7 @@ TEST(BatchingPublisherConnectionTest, BatchTorture) {
448464 Options{}
449465 .set <pubsub::MaxBatchMessagesOption>(kMaxMessages )
450466 .set <pubsub::MaxBatchBytesOption>(kMaxPayload )),
451- ordering_key, mock, background.cq ());
467+ ordering_key, mock, background.cq (), mock_batch );
452468
453469 auto worker = [&](int iterations) {
454470 auto gen = google::cloud::internal::DefaultPRNG (std::random_device{}());
@@ -477,6 +493,8 @@ TEST(BatchingPublisherConnectionTest, BatchTorture) {
477493
478494TEST (BatchingPublisherConnectionTest, BatchByMaximumHoldTime) {
479495 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
496+ auto mock_batch =
497+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
480498 pubsub::Topic const topic (" test-project" , " test-topic" );
481499
482500 EXPECT_CALL (*mock, AsyncPublish)
@@ -500,7 +518,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMaximumHoldTime) {
500518 Options{}
501519 .set <pubsub::MaxBatchMessagesOption>(4 )
502520 .set <pubsub::MaxHoldTimeOption>(std::chrono::milliseconds (5 ))),
503- /* ordering_key=*/ {}, mock, cq);
521+ /* ordering_key=*/ {}, mock, cq, mock_batch );
504522 auto r0 =
505523 publisher
506524 ->Publish ({pubsub::MessageBuilder{}.SetData (" test-data-0" ).Build ()})
@@ -531,6 +549,8 @@ TEST(BatchingPublisherConnectionTest, BatchByMaximumHoldTime) {
531549
532550TEST (BatchingPublisherConnectionTest, BatchByFlush) {
533551 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
552+ auto mock_batch =
553+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
534554 pubsub::Topic const topic (" test-project" , " test-topic" );
535555
536556 EXPECT_CALL (*mock, AsyncPublish)
@@ -565,7 +585,7 @@ TEST(BatchingPublisherConnectionTest, BatchByFlush) {
565585 Options{}
566586 .set <pubsub::MaxBatchMessagesOption>(4 )
567587 .set <pubsub::MaxHoldTimeOption>(std::chrono::milliseconds (5 ))),
568- ordering_key, mock, cq);
588+ ordering_key, mock, cq, mock_batch );
569589
570590 std::vector<future<void >> results;
571591 for (auto i : {0 , 1 }) {
@@ -608,6 +628,8 @@ TEST(BatchingPublisherConnectionTest, BatchByFlush) {
608628
609629TEST (BatchingPublisherConnectionTest, HandleError) {
610630 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
631+ auto mock_batch =
632+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
611633 pubsub::Topic const topic (" test-project" , " test-topic" );
612634
613635 auto const error_status = Status (StatusCode::kPermissionDenied , " uh-oh" );
@@ -622,7 +644,7 @@ TEST(BatchingPublisherConnectionTest, HandleError) {
622644 auto publisher = BatchingPublisherConnection::Create (
623645 topic,
624646 DefaultPublisherOptions (Options{}.set <pubsub::MaxBatchMessagesOption>(2 )),
625- ordering_key, mock, bg.cq ());
647+ ordering_key, mock, bg.cq (), mock_batch );
626648 auto r0 = publisher->Publish (
627649 {pubsub::MessageBuilder{}.SetData (" test-data-0" ).Build ()});
628650 auto r1 = publisher->Publish (
@@ -636,6 +658,8 @@ TEST(BatchingPublisherConnectionTest, HandleError) {
636658
637659TEST (BatchingPublisherConnectionTest, HandleInvalidResponse) {
638660 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
661+ auto mock_batch =
662+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
639663 pubsub::Topic const topic (" test-project" , " test-topic" );
640664
641665 EXPECT_CALL (*mock, AsyncPublish)
@@ -648,7 +672,7 @@ TEST(BatchingPublisherConnectionTest, HandleInvalidResponse) {
648672 auto publisher = BatchingPublisherConnection::Create (
649673 topic,
650674 DefaultPublisherOptions (Options{}.set <pubsub::MaxBatchMessagesOption>(2 )),
651- " test-ordering-key" , mock, background.cq ());
675+ " test-ordering-key" , mock, background.cq (), mock_batch );
652676 auto r0 = publisher->Publish (
653677 {pubsub::MessageBuilder{}.SetData (" test-data-0" ).Build ()});
654678 auto r1 = publisher->Publish (
@@ -662,6 +686,8 @@ TEST(BatchingPublisherConnectionTest, HandleInvalidResponse) {
662686
663687TEST (BatchingPublisherConnectionTest, HandleErrorWithOrderingPartialBatch) {
664688 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
689+ auto mock_batch =
690+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
665691 pubsub::Topic const topic (" test-project" , " test-topic" );
666692
667693 auto const error_status = Status (StatusCode::kPermissionDenied , " uh-oh" );
@@ -682,7 +708,7 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingPartialBatch) {
682708 topic,
683709 DefaultPublisherOptions (
684710 Options{}.set <pubsub::MaxBatchMessagesOption>(kBatchSize )),
685- ordering_key, mock, cq);
711+ ordering_key, mock, cq, mock_batch );
686712 std::vector<future<StatusOr<std::string>>> results;
687713 // Create a full batch (by message count) and a partial batch.
688714 for (int i = 0 ; i != kBatchSize + kBatchSize / 2 ; ++i) {
@@ -710,6 +736,8 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingPartialBatch) {
710736
711737TEST (BatchingPublisherConnectionTest, HandleErrorWithOrderingResume) {
712738 auto mock = std::make_shared<pubsub_testing::MockBatchSink>();
739+ auto mock_batch =
740+ std::make_shared<NiceMock<pubsub_testing::MockMessageBatch>>();
713741 pubsub::Topic const topic (" test-project" , " test-topic" );
714742 auto const ordering_key = std::string{" test-key" };
715743
@@ -743,7 +771,7 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingResume) {
743771 Options{}
744772 .set <pubsub::MaxBatchMessagesOption>(kBatchSize )
745773 .set <pubsub::MaxHoldTimeOption>(kMaxHoldTime )),
746- ordering_key, mock, cq);
774+ ordering_key, mock, cq, mock_batch );
747775 std::vector<future<StatusOr<std::string>>> results;
748776 // Create a full batch (by size).
749777 for (int i = 0 ; i != kBatchSize ; ++i) {
0 commit comments