Skip to content

Commit 44a4a32

Browse files
samples: create BigQuery subscription (#722)
* chore: Remove notes about ordering keys being experimental. * Revert "chore: Remove notes about ordering keys being experimental." This reverts commit 38b2a3e91dd4f3f3c6657f4660fa1df8c0239124. * feat: Add support for server-side flow control * Add unit test for flow control * samples: create BigQuery subscription * samples: update BigQuery subscription test * Fix linter error for PR 722 * samples: create BigQuery subscription fix unused variable Co-authored-by: Anna Cocuzzo <63511057+acocuzzo@users.noreply.github.com>
1 parent e303170 commit 44a4a32

4 files changed

Lines changed: 113 additions & 3 deletions

File tree

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
backoff==2.0.1
22
pytest==7.1.2
33
mock==4.0.3
4-
flaky==3.7.0
4+
flaky==3.7.0
5+
google-cloud-bigquery==1.28.0
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
google-cloud-pubsub==2.12.1
1+
google-cloud-pubsub==2.13.0
22
avro==1.11.0

packages/google-cloud-pubsub/samples/snippets/subscriber.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,42 @@ def create_subscription_with_exactly_once_delivery(
273273
# [END pubsub_create_subscription_with_exactly_once_delivery]
274274

275275

276+
def create_bigquery_subscription(
277+
project_id: str, topic_id: str, subscription_id: str, bigquery_table_id: str
278+
) -> None:
279+
"""Create a new BigQuery subscription on the given topic."""
280+
# [START pubsub_create_bigquery_subscription]
281+
from google.cloud import pubsub_v1
282+
283+
# TODO(developer)
284+
# project_id = "your-project-id"
285+
# topic_id = "your-topic-id"
286+
# subscription_id = "your-subscription-id"
287+
# bigquery_table_id = "your-project.your-dataset.your-table"
288+
289+
publisher = pubsub_v1.PublisherClient()
290+
subscriber = pubsub_v1.SubscriberClient()
291+
topic_path = publisher.topic_path(project_id, topic_id)
292+
subscription_path = subscriber.subscription_path(project_id, subscription_id)
293+
294+
bigquery_config = pubsub_v1.types.BigQueryConfig(table=bigquery_table_id, write_metadata=True)
295+
296+
# Wrap the subscriber in a 'with' block to automatically call close() to
297+
# close the underlying gRPC channel when done.
298+
with subscriber:
299+
subscription = subscriber.create_subscription(
300+
request={
301+
"name": subscription_path,
302+
"topic": topic_path,
303+
"bigquery_config": bigquery_config,
304+
}
305+
)
306+
307+
print(f"BigQuery subscription created: {subscription}.")
308+
print(f"Table for subscription is: {bigquery_table_id}")
309+
# [END pubsub_create_bigquery_subscription]
310+
311+
276312
def delete_subscription(project_id: str, subscription_id: str) -> None:
277313
"""Deletes an existing Pub/Sub topic."""
278314
# [START pubsub_delete_subscription]
@@ -922,6 +958,14 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
922958
"subscription_id"
923959
)
924960

961+
create_bigquery_subscription_parser = subparsers.add_parser(
962+
"create-biquery",
963+
help=create_bigquery_subscription.__doc__,
964+
)
965+
create_bigquery_subscription_parser.add_argument("topic_id")
966+
create_bigquery_subscription_parser.add_argument("subscription_id")
967+
create_bigquery_subscription_parser.add_argument("bigquery_table_id")
968+
925969
delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__)
926970
delete_parser.add_argument("subscription_id")
927971

@@ -1050,6 +1094,13 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
10501094
create_subscription_with_exactly_once_delivery(
10511095
args.project_id, args.topic_id, args.subscription_id
10521096
)
1097+
elif args.command == "create-bigquery":
1098+
create_bigquery_subscription(
1099+
args.project_id,
1100+
args.topic_id,
1101+
args.subscription_id,
1102+
args.bigquery_table_id,
1103+
)
10531104
elif args.command == "delete":
10541105
delete_subscription(args.project_id, args.subscription_id)
10551106
elif args.command == "update-push":

packages/google-cloud-pubsub/samples/snippets/subscriber_test.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@
2323
import backoff
2424
from flaky import flaky
2525
from google.api_core.exceptions import NotFound
26-
from google.cloud import pubsub_v1
26+
from google.cloud import bigquery, pubsub_v1
2727
import pytest
2828

2929
import subscriber
3030

3131
# This uuid is shared across tests which run in parallel.
3232
UUID = uuid.uuid4().hex
3333
PY_VERSION = f"{sys.version_info.major}.{sys.version_info.minor}"
34+
UNDERSCORE_PY_VERSION = PY_VERSION.replace(".", "_")
3435
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
3536
TOPIC = f"subscription-test-topic-{PY_VERSION}-{UUID}"
3637
DEAD_LETTER_TOPIC = f"subscription-test-dead-letter-topic-{PY_VERSION}-{UUID}"
@@ -42,6 +43,8 @@
4243
DEFAULT_MAX_DELIVERY_ATTEMPTS = 5
4344
UPDATED_MAX_DELIVERY_ATTEMPTS = 20
4445
FILTER = 'attributes.author="unknown"'
46+
BIGQUERY_DATASET_ID = f"python_samples_dataset_{UNDERSCORE_PY_VERSION}_{UUID}"
47+
BIGQUERY_TABLE_ID = f"python_samples_table_{UNDERSCORE_PY_VERSION}_{UUID}"
4548

4649
C = TypeVar("C", bound=Callable[..., Any])
4750

@@ -545,6 +548,61 @@ def test_update_push_subscription(
545548
subscriber_client.delete_subscription(request={"subscription": subscription_path})
546549

547550

551+
@pytest.fixture(scope="module")
552+
def bigquery_table() -> Generator[str, None, None]:
553+
client = bigquery.Client()
554+
dataset = bigquery.Dataset(f"{PROJECT_ID}.{BIGQUERY_DATASET_ID}")
555+
dataset.location = "US"
556+
dataset = client.create_dataset(dataset)
557+
558+
table_id = f"{PROJECT_ID}.{BIGQUERY_DATASET_ID}.{BIGQUERY_TABLE_ID}"
559+
schema = [
560+
bigquery.SchemaField("data", "STRING", mode="REQUIRED"),
561+
bigquery.SchemaField("message_id", "STRING", mode="REQUIRED"),
562+
bigquery.SchemaField("attributes", "STRING", mode="REQUIRED"),
563+
bigquery.SchemaField("subscription_name", "STRING", mode="REQUIRED"),
564+
bigquery.SchemaField("publish_time", "TIMESTAMP", mode="REQUIRED"),
565+
]
566+
567+
table = bigquery.Table(table_id, schema=schema)
568+
table = client.create_table(table)
569+
570+
yield table_id
571+
572+
client.delete_dataset(dataset, delete_contents=True)
573+
574+
575+
def test_create_bigquery_subscription(
576+
subscriber_client: pubsub_v1.SubscriberClient,
577+
topic: str,
578+
bigquery_table: str,
579+
capsys: CaptureFixture[str],
580+
) -> None:
581+
bigquery_subscription_for_create_name = (
582+
f"subscription-test-subscription-bigquery-for-create-{PY_VERSION}-{UUID}"
583+
)
584+
585+
subscription_path = subscriber_client.subscription_path(
586+
PROJECT_ID, bigquery_subscription_for_create_name
587+
)
588+
try:
589+
subscriber_client.delete_subscription(
590+
request={"subscription": subscription_path}
591+
)
592+
except NotFound:
593+
pass
594+
595+
subscriber.create_bigquery_subscription(
596+
PROJECT_ID, TOPIC, bigquery_subscription_for_create_name, bigquery_table
597+
)
598+
599+
out, _ = capsys.readouterr()
600+
assert f"{bigquery_subscription_for_create_name}" in out
601+
602+
# Clean up.
603+
subscriber_client.delete_subscription(request={"subscription": subscription_path})
604+
605+
548606
def test_delete_subscription(
549607
subscriber_client: pubsub_v1.SubscriberClient,
550608
topic: str,

0 commit comments

Comments
 (0)