Skip to content

Commit d4e14a7

Browse files
author
remyabel
authored
feat(pubsub): add publish_with_ordering_keys sample (#5059)
1 parent 385ebd2 commit d4e14a7

3 files changed

Lines changed: 55 additions & 0 deletions

File tree

google/cloud/pubsub/samples/pubsub_samples_common.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/pubsub/samples/pubsub_samples_common.h"
16+
#include "google/cloud/internal/getenv.h"
1617
#include <sstream>
1718

1819
namespace google {
@@ -111,6 +112,10 @@ CreateSubscriptionAdminCommand(std::string const& name,
111112
std::move(adapter)};
112113
}
113114

115+
bool UsingEmulator() {
116+
return google::cloud::internal::GetEnv("PUBSUB_EMULATOR_HOST").has_value();
117+
}
118+
114119
} // namespace examples
115120
} // namespace pubsub
116121
} // namespace cloud

google/cloud/pubsub/samples/pubsub_samples_common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ CreateSubscriptionAdminCommand(std::string const& name,
5656
std::vector<std::string> const& arg_names,
5757
SubscriptionAdminCommand const& command);
5858

59+
bool UsingEmulator();
60+
5961
} // namespace examples
6062
} // namespace pubsub
6163
} // namespace cloud

google/cloud/pubsub/samples/samples.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,43 @@ void PublishCustomAttributes(google::cloud::pubsub::Publisher publisher,
541541
(std::move(publisher));
542542
}
543543

544+
void PublishOrderingKey(google::cloud::pubsub::Publisher publisher,
545+
std::vector<std::string> const&) {
546+
//! [START pubsub_publish_with_ordering_keys] [publish-with-ordering-keys]
547+
namespace pubsub = google::cloud::pubsub;
548+
using google::cloud::future;
549+
using google::cloud::StatusOr;
550+
[](pubsub::Publisher publisher) {
551+
struct SampleData {
552+
std::string ordering_key;
553+
std::string data;
554+
} data[] = {
555+
{"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
556+
{"key1", "message4"}, {"key1", "message5"},
557+
};
558+
std::vector<future<void>> done;
559+
for (auto const& datum : data) {
560+
auto message_id =
561+
publisher.Publish(pubsub::MessageBuilder{}
562+
.SetData("Hello World! [" + datum.data + "]")
563+
.SetOrderingKey(datum.ordering_key)
564+
.Build());
565+
std::string ack_id = datum.ordering_key + "#" + datum.data;
566+
done.push_back(message_id.then([ack_id](future<StatusOr<std::string>> f) {
567+
auto id = f.get();
568+
if (!id) throw std::runtime_error(id.status().message());
569+
std::cout << "Message " << ack_id << " published with id=" << *id
570+
<< "\n";
571+
}));
572+
}
573+
publisher.Flush();
574+
// Block until all the messages are published (optional)
575+
for (auto& f : done) f.get();
576+
}
577+
//! [END pubsub_publish_with_ordering_keys] [publish-with-ordering-keys]
578+
(std::move(publisher));
579+
}
580+
544581
void Subscribe(google::cloud::pubsub::Subscriber subscriber,
545582
google::cloud::pubsub::Subscription const& subscription,
546583
std::vector<std::string> const&) {
@@ -1049,6 +1086,7 @@ void SubscriberRetrySettings(std::vector<std::string> const& argv) {
10491086

10501087
void AutoRun(std::vector<std::string> const& argv) {
10511088
namespace examples = ::google::cloud::testing_util;
1089+
using ::google::cloud::pubsub::examples::UsingEmulator;
10521090

10531091
if (!argv.empty()) throw examples::Usage{"auto"};
10541092
examples::CheckEnvironmentVariablesAreSet({
@@ -1210,6 +1248,15 @@ void AutoRun(std::vector<std::string> const& argv) {
12101248
std::cout << "\nRunning SubscribeCustomAttributes() sample" << std::endl;
12111249
SubscribeCustomAttributes(subscriber, subscription, {});
12121250

1251+
auto publisher_with_ordering_key = google::cloud::pubsub::Publisher(
1252+
google::cloud::pubsub::MakePublisherConnection(
1253+
topic, google::cloud::pubsub::PublisherOptions{}
1254+
.set_maximum_message_count(1)
1255+
.enable_message_ordering()));
1256+
std::cout << "\nRunning PublishOrderingKey() sample" << std::endl;
1257+
1258+
if (UsingEmulator()) PublishOrderingKey(publisher_with_ordering_key, {});
1259+
12131260
std::cout << "\nRunning Publish() sample [3]" << std::endl;
12141261
Publish(publisher, {});
12151262

@@ -1312,6 +1359,7 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape)
13121359
CreatePublisherCommand("publish", {}, Publish),
13131360
CreatePublisherCommand("publish-custom-attributes", {},
13141361
PublishCustomAttributes),
1362+
CreatePublisherCommand("publish-ordering-key", {}, PublishOrderingKey),
13151363
CreateSubscriberCommand("subscribe", {}, Subscribe),
13161364
CreateSubscriberCommand("subscribe-error-listener", {},
13171365
SubscribeErrorListener),

0 commit comments

Comments
 (0)