Skip to content

Commit a820ab8

Browse files
samples: publish with ordering keys (googleapis#287)
* samples: add samples for publish with ordering keys * Point to regional endpoint for ordered publish; fix output on resume publish example. Co-authored-by: Kamal Aboul-Hosn <kamal.aboulhosn@gmail.com>
1 parent 6bd5016 commit a820ab8

File tree

6 files changed

+280
-1
lines changed

6 files changed

+280
-1
lines changed

samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package pubsub;
1818

1919
// [START pubsub_dead_letter_create_subscription]
20-
2120
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
2221
import com.google.pubsub.v1.DeadLetterPolicy;
2322
import com.google.pubsub.v1.ProjectSubscriptionName;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package pubsub;
18+
19+
// [START pubsub_enable_subscription_ordering]
20+
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
21+
import com.google.pubsub.v1.ProjectSubscriptionName;
22+
import com.google.pubsub.v1.ProjectTopicName;
23+
import com.google.pubsub.v1.Subscription;
24+
import java.io.IOException;
25+
26+
public class CreateSubscriptionWithOrdering {
27+
public static void main(String... args) throws Exception {
28+
// TODO(developer): Replace these variables before running the sample.
29+
String projectId = "your-project-id";
30+
String topicId = "your-topic-id";
31+
String subscriptionId = "your-subscription-id";
32+
33+
createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
34+
}
35+
36+
public static void createSubscriptionWithOrderingExample(
37+
String projectId, String topicId, String subscriptionId) throws IOException {
38+
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
39+
40+
ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
41+
ProjectSubscriptionName subscriptionName =
42+
ProjectSubscriptionName.of(projectId, subscriptionId);
43+
44+
Subscription subscription =
45+
subscriptionAdminClient.createSubscription(
46+
Subscription.newBuilder()
47+
.setName(subscriptionName.toString())
48+
.setTopic(topicName.toString())
49+
// Set message ordering to true for ordered messages in the subscription.
50+
.setEnableMessageOrdering(true)
51+
.build());
52+
53+
System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
54+
}
55+
}
56+
}
57+
// [END pubsub_enable_subscription_ordering]
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package pubsub;
18+
19+
// [START pubsub_publish_with_ordering_keys]
20+
import com.google.api.core.ApiFuture;
21+
import com.google.api.core.ApiFutureCallback;
22+
import com.google.api.core.ApiFutures;
23+
import com.google.api.gax.rpc.ApiException;
24+
import com.google.cloud.pubsub.v1.Publisher;
25+
import com.google.common.util.concurrent.MoreExecutors;
26+
import com.google.protobuf.ByteString;
27+
import com.google.pubsub.v1.PubsubMessage;
28+
import com.google.pubsub.v1.TopicName;
29+
import java.io.IOException;
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
import java.util.concurrent.TimeUnit;
33+
34+
public class PublishWithOrderingKeys {
35+
public static void main(String... args) throws Exception {
36+
// TODO(developer): Replace these variables before running the sample.
37+
String projectId = "your-project-id";
38+
// Choose an existing topic.
39+
String topicId = "your-topic-id";
40+
41+
publishWithOrderingKeysExample(projectId, topicId);
42+
}
43+
44+
public static void publishWithOrderingKeysExample(String projectId, String topicId)
45+
throws IOException, InterruptedException {
46+
TopicName topicName = TopicName.of(projectId, topicId);
47+
// Create a publisher and set message ordering to true.
48+
Publisher publisher =
49+
Publisher.newBuilder(topicName)
50+
.setEndpoint("us-east1-pubsub.googleapis.com:443")
51+
.setEnableMessageOrdering(true)
52+
.build();
53+
54+
try {
55+
Map<String, String> messages = new HashMap<String, String>();
56+
messages.put("message1", "key1");
57+
messages.put("message2", "key2");
58+
messages.put("message3", "key1");
59+
messages.put("message4", "key2");
60+
61+
for (Map.Entry<String, String> entry : messages.entrySet()) {
62+
ByteString data = ByteString.copyFromUtf8(entry.getKey());
63+
PubsubMessage pubsubMessage =
64+
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
65+
ApiFuture<String> future = publisher.publish(pubsubMessage);
66+
67+
// Add an asynchronous callback to handle publish success / failure.
68+
ApiFutures.addCallback(
69+
future,
70+
new ApiFutureCallback<String>() {
71+
72+
@Override
73+
public void onFailure(Throwable throwable) {
74+
if (throwable instanceof ApiException) {
75+
ApiException apiException = ((ApiException) throwable);
76+
// Details on the API exception.
77+
System.out.println(apiException.getStatusCode().getCode());
78+
System.out.println(apiException.isRetryable());
79+
}
80+
System.out.println("Error publishing message : " + pubsubMessage.getData());
81+
}
82+
83+
@Override
84+
public void onSuccess(String messageId) {
85+
// Once published, returns server-assigned message ids (unique within the topic).
86+
System.out.println(pubsubMessage.getData() + " : " + messageId);
87+
}
88+
},
89+
MoreExecutors.directExecutor());
90+
}
91+
} finally {
92+
// When finished with the publisher, shutdown to free up resources.
93+
publisher.shutdown();
94+
publisher.awaitTermination(1, TimeUnit.MINUTES);
95+
}
96+
}
97+
}
98+
// [END pubsub_publish_with_ordering_keys]
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package pubsub;
18+
19+
// [START pubsub_resume_publish_with_ordering_keys]
20+
import com.google.api.core.ApiFuture;
21+
import com.google.api.core.ApiFutureCallback;
22+
import com.google.api.core.ApiFutures;
23+
import com.google.api.gax.rpc.ApiException;
24+
import com.google.cloud.pubsub.v1.Publisher;
25+
import com.google.common.util.concurrent.MoreExecutors;
26+
import com.google.protobuf.ByteString;
27+
import com.google.pubsub.v1.PubsubMessage;
28+
import com.google.pubsub.v1.TopicName;
29+
import java.io.IOException;
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
import java.util.concurrent.TimeUnit;
33+
34+
public class ResumePublishWithOrderingKeys {
35+
public static void main(String... args) throws Exception {
36+
// TODO(developer): Replace these variables before running the sample.
37+
String projectId = "your-project-id";
38+
// Choose an existing topic.
39+
String topicId = "your-topic-id";
40+
41+
resumePublishWithOrderingKeysExample(projectId, topicId);
42+
}
43+
44+
public static void resumePublishWithOrderingKeysExample(String projectId, String topicId)
45+
throws IOException, InterruptedException {
46+
TopicName topicName = TopicName.of(projectId, topicId);
47+
// Create a publisher and set message ordering to true.
48+
Publisher publisher =
49+
Publisher.newBuilder(topicName)
50+
.setEnableMessageOrdering(true)
51+
.setEndpoint("us-east1-pubsub.googleapis.com:443")
52+
.build();
53+
54+
try {
55+
Map<String, String> messages = new HashMap<String, String>();
56+
messages.put("message1", "key1");
57+
messages.put("message2", "key2");
58+
messages.put("message3", "key1");
59+
messages.put("message4", "key2");
60+
61+
for (Map.Entry<String, String> entry : messages.entrySet()) {
62+
ByteString data = ByteString.copyFromUtf8(entry.getKey());
63+
PubsubMessage pubsubMessage =
64+
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
65+
ApiFuture<String> future = publisher.publish(pubsubMessage);
66+
67+
// Add an asynchronous callback to handle publish success / failure.
68+
ApiFutures.addCallback(
69+
future,
70+
new ApiFutureCallback<String>() {
71+
72+
@Override
73+
public void onFailure(Throwable throwable) {
74+
if (throwable instanceof ApiException) {
75+
ApiException apiException = ((ApiException) throwable);
76+
// Details on the API exception.
77+
System.out.println(apiException.getStatusCode().getCode());
78+
System.out.println(apiException.isRetryable());
79+
}
80+
System.out.println("Error publishing message : " + pubsubMessage.getData());
81+
// (Beta) Must call resumePublish to reset key and continue publishing with order.
82+
publisher.resumePublish(pubsubMessage.getOrderingKey());
83+
}
84+
85+
@Override
86+
public void onSuccess(String messageId) {
87+
// Once published, returns server-assigned message ids (unique within the topic).
88+
System.out.println(pubsubMessage.getData() + " : " + messageId);
89+
}
90+
},
91+
MoreExecutors.directExecutor());
92+
}
93+
} finally {
94+
publisher.shutdown();
95+
publisher.awaitTermination(1, TimeUnit.MINUTES);
96+
}
97+
}
98+
}
99+
// [END pubsub_resume_publish_with_ordering_keys]

samples/snippets/src/test/java/pubsub/AdminIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,16 @@ public class AdminIT {
4343
private static final String topicId = "iam-topic-" + _suffix;
4444
private static final String pullSubscriptionId = "iam-pull-subscription-" + _suffix;
4545
private static final String pushSubscriptionId = "iam-push-subscription-" + _suffix;
46+
private static final String orderedSubscriptionId = "iam-ordered-subscription-" + _suffix;
4647
private static final String pushEndpoint = "https://my-test-project.appspot.com/push";
4748

4849
private static final ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
4950
private static final ProjectSubscriptionName pullSubscriptionName =
5051
ProjectSubscriptionName.of(projectId, pullSubscriptionId);
5152
private static final ProjectSubscriptionName pushSubscriptionName =
5253
ProjectSubscriptionName.of(projectId, pushSubscriptionId);
54+
private static final ProjectSubscriptionName orderedSubscriptionName =
55+
ProjectSubscriptionName.of(projectId, orderedSubscriptionId);
5356

5457
private static void requireEnvVar(String varName) {
5558
assertNotNull(
@@ -78,6 +81,7 @@ public void tearDown() throws Exception {
7881
try {
7982
subscriptionAdminClient.deleteSubscription(pullSubscriptionName);
8083
subscriptionAdminClient.deleteSubscription(pushSubscriptionName);
84+
subscriptionAdminClient.deleteSubscription(orderedSubscriptionName);
8185
} catch (NotFoundException e) {
8286
}
8387
}
@@ -165,10 +169,18 @@ public void testAdmin() throws Exception {
165169
assertThat(bout.toString()).contains("permissions: \"pubsub.subscriptions.consume\"");
166170
assertThat(bout.toString()).contains("permissions: \"pubsub.subscriptions.update\"");
167171

172+
bout.reset();
173+
// Test create a subscription with ordering
174+
CreateSubscriptionWithOrdering.createSubscriptionWithOrderingExample(
175+
projectId, topicId, orderedSubscriptionId);
176+
assertThat(bout.toString()).contains("Created a subscription with ordering");
177+
assertThat(bout.toString()).contains("enable_message_ordering=true");
178+
168179
bout.reset();
169180
// Test delete subscription. Run twice to delete both pull and push subscriptions.
170181
DeleteSubscriptionExample.deleteSubscriptionExample(projectId, pullSubscriptionId);
171182
DeleteSubscriptionExample.deleteSubscriptionExample(projectId, pushSubscriptionId);
183+
DeleteSubscriptionExample.deleteSubscriptionExample(projectId, orderedSubscriptionId);
172184
assertThat(bout.toString()).contains("Deleted subscription.");
173185

174186
bout.reset();

samples/snippets/src/test/java/pubsub/PublisherIT.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,19 @@ public void testPublisher() throws Exception {
105105
// Test publish with Retry settings.
106106
PublishWithRetrySettingsExample.publishWithRetrySettingsExample(projectId, topicId);
107107
assertThat(bout.toString()).contains("Published a message with retry settings: ");
108+
109+
bout.reset();
110+
// Test publish with ordering keys.
111+
PublishWithOrderingKeys.publishWithOrderingKeysExample(projectId, topicId);
112+
for (int i = 1; i <= 4; i++) {
113+
assertThat(bout.toString()).contains("message" + i);
114+
}
115+
116+
bout.reset();
117+
// Test resume publish with ordering keys.
118+
ResumePublishWithOrderingKeys.resumePublishWithOrderingKeysExample(projectId, topicId);
119+
for (int i = 1; i <= 4; i++) {
120+
assertThat(bout.toString()).contains("message" + i);
121+
}
108122
}
109123
}

0 commit comments

Comments
 (0)