diff --git a/samples/snippets/README.rst b/samples/snippets/README.rst index 40b2e21fc..2c67c2c11 100644 --- a/samples/snippets/README.rst +++ b/samples/snippets/README.rst @@ -110,11 +110,11 @@ To run this sample: .. code-block:: bash - $ python publisher.py + $ python publisher.py --help usage: publisher.py [-h] project_id - {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} + {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings,publish-with-ordering-keys,resume-publish-with-ordering-keys} ... This application demonstrates how to perform basic operations on topics @@ -125,7 +125,7 @@ To run this sample: positional arguments: project_id Your Google Cloud project ID - {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} + {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings,publish-with-ordering-keys,resume-publish-with-ordering-keys} list Lists all Pub/Sub topics in the given project. create Create a new Pub/Sub topic. delete Deletes an existing Pub/Sub topic. @@ -141,6 +141,11 @@ To run this sample: batch settings. publish-with-retry-settings Publishes messages with custom retry settings. + publish-with-ordering-keys + Publishes messages with ordering keys. + resume-publish-with-ordering-keys + Resume publishing messages with ordering keys when + unrecoverable errors occur. optional arguments: -h, --help show this help message and exit @@ -160,11 +165,11 @@ To run this sample: .. code-block:: bash - $ python subscriber.py + $ python subscriber.py --help usage: subscriber.py [-h] project_id - {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,create-with-ordering,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} ... This application demonstrates how to perform basic operations on @@ -175,13 +180,15 @@ To run this sample: positional arguments: project_id Your Google Cloud project ID - {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,create-with-ordering,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} list-in-topic Lists all subscriptions for a given topic. list-in-project Lists all subscriptions in the current project. create Create a new pull subscription on the given topic. create-with-dead-letter-policy Create a subscription with dead letter policy. create-push Create a new push subscription on the given topic. + create-with-ordering + Create a subscription with dead letter policy. delete Deletes an existing Pub/Sub topic. update-push Updates an existing Pub/Sub subscription's push endpoint URL. Note that certain properties of a @@ -208,7 +215,6 @@ To run this sample: -h, --help show this help message and exit - Identity and Access Management +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ @@ -221,20 +227,15 @@ Identity and Access Management To run this sample: .. code-block:: bash - $ python iam.py - usage: iam.py [-h] project {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} ... - This application demonstrates how to perform basic operations on IAM policies with the Cloud Pub/Sub API. - For more information, see the README.md under /pubsub and the documentation at https://cloud.google.com/pubsub/docs. - positional arguments: project Your Google Cloud project ID {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} @@ -250,14 +251,10 @@ To run this sample: check-subscription-permissions Checks to which permissions are available on the given subscription. - optional arguments: -h, --help show this help message and exit - - - The client library ------------------------------------------------------------------------------- @@ -273,4 +270,4 @@ to `browse the source`_ and `report issues`_. https://github.com/GoogleCloudPlatform/google-cloud-python/issues -.. _Google Cloud SDK: https://cloud.google.com/sdk/ \ No newline at end of file +.. _Google Cloud SDK: https://cloud.google.com/sdk/ diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 477b31b9c..399d37679 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -273,6 +273,94 @@ def publish_messages_with_retry_settings(project_id, topic_id): # [END pubsub_publisher_retry_settings] +def publish_with_ordering_keys(project_id, topic_id): + """Publishes messages with ordering keys.""" + # [START pubsub_publish_with_ordering_keys] + from google.cloud import pubsub_v1 + + # TODO(developer): Choose an existing topic. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher_options = pubsub_v1.types.PublisherOptions( + enable_message_ordering=True + ) + # Sending messages to the same region ensures they are received in order + # even when multiple publishers are used. + client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"} + publisher = pubsub_v1.PublisherClient( + publisher_options=publisher_options, + client_options=client_options + ) + # The `topic_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/topics/{topic_id}` + topic_path = publisher.topic_path(project_id, topic_id) + + for message in [ + ("message1", "key1"), + ("message2", "key2"), + ("message3", "key1"), + ("message4", "key2"), + ]: + # Data must be a bytestring + data = message[0].encode("utf-8") + ordering_key = message[1] + # When you publish a message, the client returns a future. + future = publisher.publish( + topic_path, data=data, ordering_key=ordering_key + ) + print(future.result()) + + print("Published messages with ordering keys.") + # [END pubsub_publish_with_ordering_keys] + + +def resume_publish_with_ordering_keys(project_id, topic_id): + """Resume publishing messages with ordering keys when unrecoverable errors occur.""" + # [START pubsub_resume_publish_with_ordering_keys] + from google.cloud import pubsub_v1 + + # TODO(developer): Choose an existing topic. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher_options = pubsub_v1.types.PublisherOptions( + enable_message_ordering=True + ) + # Sending messages to the same region ensures they are received in order + # even when multiple publishers are used. + client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"} + publisher = pubsub_v1.PublisherClient( + publisher_options=publisher_options, + client_options=client_options + ) + # The `topic_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/topics/{topic_id}` + topic_path = publisher.topic_path(project_id, topic_id) + + for message in [ + ("message1", "key1"), + ("message2", "key2"), + ("message3", "key1"), + ("message4", "key2"), + ]: + # Data must be a bytestring + data = message[0].encode("utf-8") + ordering_key = message[1] + # When you publish a message, the client returns a future. + future = publisher.publish( + topic_path, data=data, ordering_key=ordering_key + ) + try: + print(future.result()) + except RuntimeError: + # Resume publish on an ordering key that has had unrecoverable errors. + publisher.resume_publish(topic_path, ordering_key) + + print("Published messages with ordering keys.") + # [END pubsub_resume_publish_with_ordering_keys] + + if __name__ == "__main__": parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, @@ -288,7 +376,9 @@ def publish_messages_with_retry_settings(project_id, topic_id): delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) delete_parser.add_argument("topic_id") - publish_parser = subparsers.add_parser("publish", help=publish_messages.__doc__) + publish_parser = subparsers.add_parser( + "publish", help=publish_messages.__doc__ + ) publish_parser.add_argument("topic_id") publish_with_custom_attributes_parser = subparsers.add_parser( @@ -298,7 +388,8 @@ def publish_messages_with_retry_settings(project_id, topic_id): publish_with_custom_attributes_parser.add_argument("topic_id") publish_with_error_handler_parser = subparsers.add_parser( - "publish-with-error-handler", help=publish_messages_with_error_handler.__doc__, + "publish-with-error-handler", + help=publish_messages_with_error_handler.__doc__, ) publish_with_error_handler_parser.add_argument("topic_id") @@ -314,6 +405,17 @@ def publish_messages_with_retry_settings(project_id, topic_id): ) publish_with_retry_settings_parser.add_argument("topic_id") + publish_with_ordering_keys_parser = subparsers.add_parser( + "publish-with-ordering-keys", help=publish_with_ordering_keys.__doc__, + ) + publish_with_ordering_keys_parser.add_argument("topic_id") + + resume_publish_with_ordering_keys_parser = subparsers.add_parser( + "resume-publish-with-ordering-keys", + help=resume_publish_with_ordering_keys.__doc__, + ) + resume_publish_with_ordering_keys_parser.add_argument("topic_id") + args = parser.parse_args() if args.command == "list": @@ -332,3 +434,7 @@ def publish_messages_with_retry_settings(project_id, topic_id): publish_messages_with_batch_settings(args.project_id, args.topic_id) elif args.command == "publish-with-retry-settings": publish_messages_with_retry_settings(args.project_id, args.topic_id) + elif args.command == "publish-with-ordering-keys": + publish_with_ordering_keys(args.project_id, args.topic_id) + elif args.command == "resume-publish-with-ordering-keys": + resume_publish_with_ordering_keys(args.project_id, args.topic_id) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index b5c2ea1ea..95fda846a 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -144,3 +144,17 @@ def test_publish_with_error_handler(topic_publish, capsys): out, _ = capsys.readouterr() assert "Published" in out + + +def test_publish_with_ordering_keys(topic_publish, capsys): + publisher.publish_with_ordering_keys(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published messages with ordering keys." in out + + +def test_resume_publish_with_error_handler(topic_publish, capsys): + publisher.resume_publish_with_ordering_keys(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published messages with ordering keys." in out diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index aeffb80d7..94e1c5cd4 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -160,6 +160,28 @@ def create_push_subscription(project_id, topic_id, subscription_id, endpoint): # [END pubsub_create_push_subscription] +def create_subscription_with_ordering(project_id, topic_id, subscription_id): + """Create a subscription with dead letter policy.""" + # [START pubsub_enable_subscription_ordering] + from google.cloud import pubsub_v1 + + # TODO(developer): Choose an existing topic. + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + with subscriber: + subscription = subscriber.create_subscription( + subscription_path, topic_path, enable_message_ordering=True + ) + print("Created subscription with ordering: {}".format(subscription)) + # [END pubsub_enable_subscription_ordering] + + def delete_subscription(project_id, subscription_id): """Deletes an existing Pub/Sub topic.""" # [START pubsub_delete_subscription] @@ -654,6 +676,12 @@ def callback(message): create_push_parser.add_argument("subscription_id") create_push_parser.add_argument("endpoint") + create_subscription_with_ordering_parser = subparsers.add_parser( + "create-with-ordering", help=create_subscription_with_ordering.__doc__ + ) + create_subscription_with_ordering_parser.add_argument("topic_id") + create_subscription_with_ordering_parser.add_argument("subscription_id") + delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__) delete_parser.add_argument("subscription_id") @@ -746,6 +774,10 @@ def callback(message): create_push_subscription( args.project_id, args.topic_id, args.subscription_id, args.endpoint, ) + elif args.command == "create-with-ordering": + create_subscription_with_ordering( + args.project_id, args.topic_id, args.subscription_id + ) elif args.command == "delete": delete_subscription(args.project_id, args.subscription_id) elif args.command == "update-push": diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index a7f7c139c..62018e9a9 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -29,6 +29,7 @@ SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID +SUBSCRIPTION_ORDERING = "subscription-test-subscription-ordering-" + UUID ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) @@ -209,6 +210,16 @@ def eventually_consistent_test(): eventually_consistent_test() +def test_create_subscription_with_ordering(subscriber_client, capsys): + subscriber.create_subscription_with_ordering(PROJECT, TOPIC, SUBSCRIPTION_ORDERING) + out, _ = capsys.readouterr() + assert "Created subscription with ordering" in out + assert "enable_message_ordering: true" in out + + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ORDERING) + subscriber_client.delete_subscription(subscription_path) + + def test_update(subscriber_client, subscription_admin, capsys): subscriber.update_push_subscription( PROJECT, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT