diff --git a/.kokoro/samples/python3.6/periodic-head.cfg b/.kokoro/samples/python3.6/periodic-head.cfg new file mode 100644 index 00000000..f9cfcd33 --- /dev/null +++ b/.kokoro/samples/python3.6/periodic-head.cfg @@ -0,0 +1,11 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} + +env_vars: { + key: "TRAMPOLINE_BUILD_FILE" + value: "github/python-pubsub/.kokoro/test-samples-against-head.sh" +} diff --git a/.kokoro/samples/python3.7/periodic-head.cfg b/.kokoro/samples/python3.7/periodic-head.cfg new file mode 100644 index 00000000..f9cfcd33 --- /dev/null +++ b/.kokoro/samples/python3.7/periodic-head.cfg @@ -0,0 +1,11 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} + +env_vars: { + key: "TRAMPOLINE_BUILD_FILE" + value: "github/python-pubsub/.kokoro/test-samples-against-head.sh" +} diff --git a/.kokoro/samples/python3.8/periodic-head.cfg b/.kokoro/samples/python3.8/periodic-head.cfg new file mode 100644 index 00000000..f9cfcd33 --- /dev/null +++ b/.kokoro/samples/python3.8/periodic-head.cfg @@ -0,0 +1,11 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} + +env_vars: { + key: "TRAMPOLINE_BUILD_FILE" + value: "github/python-pubsub/.kokoro/test-samples-against-head.sh" +} diff --git a/.kokoro/test-samples-against-head.sh b/.kokoro/test-samples-against-head.sh new file mode 100755 index 00000000..36ba35b5 --- /dev/null +++ b/.kokoro/test-samples-against-head.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# A customized test runner for samples. +# +# For periodic builds, you can specify this file for testing against head. + +# `-e` enables the script to automatically fail when a command fails +# `-o pipefail` sets the exit code to the rightmost comment to exit with a non-zero +set -eo pipefail +# Enables `**` to include files nested inside sub-folders +shopt -s globstar + +cd github/python-bigquery-storage + +exec .kokoro/test-samples-impl.sh diff --git a/.kokoro/test-samples-impl.sh b/.kokoro/test-samples-impl.sh new file mode 100755 index 00000000..cf5de74c --- /dev/null +++ b/.kokoro/test-samples-impl.sh @@ -0,0 +1,102 @@ +#!/bin/bash +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# `-e` enables the script to automatically fail when a command fails +# `-o pipefail` sets the exit code to the rightmost comment to exit with a non-zero +set -eo pipefail +# Enables `**` to include files nested inside sub-folders +shopt -s globstar + +# Exit early if samples directory doesn't exist +if [ ! -d "./samples" ]; then + echo "No tests run. `./samples` not found" + exit 0 +fi + +# Disable buffering, so that the logs stream through. +export PYTHONUNBUFFERED=1 + +# Debug: show build environment +env | grep KOKORO + +# Install nox +python3.6 -m pip install --upgrade --quiet nox + +# Use secrets acessor service account to get secrets +if [[ -f "${KOKORO_GFILE_DIR}/secrets_viewer_service_account.json" ]]; then + gcloud auth activate-service-account \ + --key-file="${KOKORO_GFILE_DIR}/secrets_viewer_service_account.json" \ + --project="cloud-devrel-kokoro-resources" +fi + +# This script will create 3 files: +# - testing/test-env.sh +# - testing/service-account.json +# - testing/client-secrets.json +./scripts/decrypt-secrets.sh + +source ./testing/test-env.sh +export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/testing/service-account.json + +# For cloud-run session, we activate the service account for gcloud sdk. +gcloud auth activate-service-account \ + --key-file "${GOOGLE_APPLICATION_CREDENTIALS}" + +export GOOGLE_CLIENT_SECRETS=$(pwd)/testing/client-secrets.json + +echo -e "\n******************** TESTING PROJECTS ********************" + +# Switch to 'fail at end' to allow all tests to complete before exiting. +set +e +# Use RTN to return a non-zero value if the test fails. +RTN=0 +ROOT=$(pwd) +# Find all requirements.txt in the samples directory (may break on whitespace). +for file in samples/**/requirements.txt; do + cd "$ROOT" + # Navigate to the project folder. + file=$(dirname "$file") + cd "$file" + + echo "------------------------------------------------------------" + echo "- testing $file" + echo "------------------------------------------------------------" + + # Use nox to execute the tests for the project. + python3.6 -m nox -s "$RUN_TESTS_SESSION" + EXIT=$? + + # If this is a periodic build, send the test log to the FlakyBot. + # See https://github.com/googleapis/repo-automation-bots/tree/master/packages/flakybot. + if [[ $KOKORO_BUILD_ARTIFACTS_SUBDIR = *"periodic"* ]]; then + chmod +x $KOKORO_GFILE_DIR/linux_amd64/flakybot + $KOKORO_GFILE_DIR/linux_amd64/flakybot + fi + + if [[ $EXIT -ne 0 ]]; then + RTN=1 + echo -e "\n Testing failed: Nox returned a non-zero exit code. \n" + else + echo -e "\n Testing completed.\n" + fi + +done +cd "$ROOT" + +# Workaround for Kokoro permissions issue: delete secrets +rm testing/{test-env.sh,client-secrets.json,service-account.json} + +exit "$RTN" diff --git a/.kokoro/test-samples.sh b/.kokoro/test-samples.sh index 056e11a3..3ef99a86 100755 --- a/.kokoro/test-samples.sh +++ b/.kokoro/test-samples.sh @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +# The default test runner for samples. +# +# For periodic builds, we rewinds the repo to the latest release, and +# run test-samples-impl.sh. # `-e` enables the script to automatically fail when a command fails # `-o pipefail` sets the exit code to the rightmost comment to exit with a non-zero @@ -24,87 +28,19 @@ cd github/python-bigquery-storage # Run periodic samples tests at latest release if [[ $KOKORO_BUILD_ARTIFACTS_SUBDIR = *"periodic"* ]]; then + # preserving the test runner implementation. + cp .kokoro/test-samples-impl.sh "${TMPDIR}/test-samples-impl.sh" + echo "--- IMPORTANT IMPORTANT IMPORTANT ---" + echo "Now we rewind the repo back to the latest release..." LATEST_RELEASE=$(git describe --abbrev=0 --tags) git checkout $LATEST_RELEASE -fi - -# Exit early if samples directory doesn't exist -if [ ! -d "./samples" ]; then - echo "No tests run. `./samples` not found" - exit 0 -fi - -# Disable buffering, so that the logs stream through. -export PYTHONUNBUFFERED=1 - -# Debug: show build environment -env | grep KOKORO - -# Install nox -python3.6 -m pip install --upgrade --quiet nox - -# Use secrets acessor service account to get secrets -if [[ -f "${KOKORO_GFILE_DIR}/secrets_viewer_service_account.json" ]]; then - gcloud auth activate-service-account \ - --key-file="${KOKORO_GFILE_DIR}/secrets_viewer_service_account.json" \ - --project="cloud-devrel-kokoro-resources" -fi - -# This script will create 3 files: -# - testing/test-env.sh -# - testing/service-account.json -# - testing/client-secrets.json -./scripts/decrypt-secrets.sh - -source ./testing/test-env.sh -export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/testing/service-account.json - -# For cloud-run session, we activate the service account for gcloud sdk. -gcloud auth activate-service-account \ - --key-file "${GOOGLE_APPLICATION_CREDENTIALS}" - -export GOOGLE_CLIENT_SECRETS=$(pwd)/testing/client-secrets.json - -echo -e "\n******************** TESTING PROJECTS ********************" - -# Switch to 'fail at end' to allow all tests to complete before exiting. -set +e -# Use RTN to return a non-zero value if the test fails. -RTN=0 -ROOT=$(pwd) -# Find all requirements.txt in the samples directory (may break on whitespace). -for file in samples/**/requirements.txt; do - cd "$ROOT" - # Navigate to the project folder. - file=$(dirname "$file") - cd "$file" - - echo "------------------------------------------------------------" - echo "- testing $file" - echo "------------------------------------------------------------" - - # Use nox to execute the tests for the project. - python3.6 -m nox -s "$RUN_TESTS_SESSION" - EXIT=$? - - # If this is a periodic build, send the test log to the FlakyBot. - # See https://github.com/googleapis/repo-automation-bots/tree/master/packages/flakybot. - if [[ $KOKORO_BUILD_ARTIFACTS_SUBDIR = *"periodic"* ]]; then - chmod +x $KOKORO_GFILE_DIR/linux_amd64/flakybot - $KOKORO_GFILE_DIR/linux_amd64/flakybot + echo "The current head is: " + echo $(git rev-parse --verify HEAD) + echo "--- IMPORTANT IMPORTANT IMPORTANT ---" + # move back the test runner implementation if there's no file. + if [ ! -f .kokoro/test-samples-impl.sh ]; then + cp "${TMPDIR}/test-samples-impl.sh" .kokoro/test-samples-impl.sh fi +fi - if [[ $EXIT -ne 0 ]]; then - RTN=1 - echo -e "\n Testing failed: Nox returned a non-zero exit code. \n" - else - echo -e "\n Testing completed.\n" - fi - -done -cd "$ROOT" - -# Workaround for Kokoro permissions issue: delete secrets -rm testing/{test-env.sh,client-secrets.json,service-account.json} - -exit "$RTN" +exec .kokoro/test-samples-impl.sh diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a9024b15..8912e9b5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,3 +1,17 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks repos: @@ -12,6 +26,6 @@ repos: hooks: - id: black - repo: https://gitlab.com/pycqa/flake8 - rev: 3.8.4 + rev: 3.9.0 hooks: - id: flake8 diff --git a/CHANGELOG.md b/CHANGELOG.md index 69fe6ffb..df7e6cce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,19 @@ [1]: https://pypi.org/project/google-cloud-bigquery-storage/#history +## [2.4.0](https://www.github.com/googleapis/python-bigquery-storage/compare/v2.3.0...v2.4.0) (2021-04-07) + + +### Features + +* add a Arrow compression options (Only LZ4 for now) ([#166](https://www.github.com/googleapis/python-bigquery-storage/issues/166)) ([1c91a27](https://www.github.com/googleapis/python-bigquery-storage/commit/1c91a276289a0e319f93b136836f81ee943f661c)) +* updates for v1beta2 storage API - Updated comments on BatchCommitWriteStreams - Added new support Bigquery types BIGNUMERIC and INTERVAL to TableSchema - Added read rows schema in ReadRowsResponse - Misc comment updates ([#172](https://www.github.com/googleapis/python-bigquery-storage/issues/172)) ([bef63fb](https://www.github.com/googleapis/python-bigquery-storage/commit/bef63fbb3b7e41e1c0d73f91a2c86d4d24e42151)) + + +### Dependencies + +* update minimum pandas to 0.21.1 ([#165](https://www.github.com/googleapis/python-bigquery-storage/issues/165)) ([8a97763](https://www.github.com/googleapis/python-bigquery-storage/commit/8a977633a81d080f03f6922752adbf4284199dd4)) + ## [2.3.0](https://www.github.com/googleapis/python-bigquery-storage/compare/v2.2.1...v2.3.0) (2021-02-18) diff --git a/docs/conf.py b/docs/conf.py index 96d96148..6119caee 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,4 +1,17 @@ # -*- coding: utf-8 -*- +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # # google-cloud-bigquery-storage documentation build configuration file # diff --git a/google/cloud/bigquery_storage/__init__.py b/google/cloud/bigquery_storage/__init__.py index 227e6184..003bc480 100644 --- a/google/cloud/bigquery_storage/__init__.py +++ b/google/cloud/bigquery_storage/__init__.py @@ -20,6 +20,7 @@ from google.cloud.bigquery_storage_v1 import __version__ from google.cloud.bigquery_storage_v1.types.arrow import ArrowRecordBatch from google.cloud.bigquery_storage_v1.types.arrow import ArrowSchema +from google.cloud.bigquery_storage_v1.types.arrow import ArrowSerializationOptions from google.cloud.bigquery_storage_v1.types.avro import AvroRows from google.cloud.bigquery_storage_v1.types.avro import AvroSchema from google.cloud.bigquery_storage_v1.types.storage import CreateReadSessionRequest @@ -38,6 +39,7 @@ "types", "ArrowRecordBatch", "ArrowSchema", + "ArrowSerializationOptions", "AvroRows", "AvroSchema", "BigQueryReadClient", diff --git a/google/cloud/bigquery_storage_v1/proto/arrow.proto b/google/cloud/bigquery_storage_v1/proto/arrow.proto index 1c54eeab..4b240f52 100644 --- a/google/cloud/bigquery_storage_v1/proto/arrow.proto +++ b/google/cloud/bigquery_storage_v1/proto/arrow.proto @@ -1,4 +1,4 @@ -// Copyright 2019 Google LLC. +// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// syntax = "proto3"; @@ -43,3 +42,19 @@ message ArrowRecordBatch { // The count of rows in `serialized_record_batch`. int64 row_count = 2; } + +// Contains options specific to Arrow Serialization. +message ArrowSerializationOptions { + // Compression codec's supported by Arrow. + enum CompressionCodec { + // If unspecified no compression will be used. + COMPRESSION_UNSPECIFIED = 0; + + // LZ4 Frame (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md) + LZ4_FRAME = 1; + } + + // The compression codec to use for Arrow buffers in serialized record + // batches. + CompressionCodec buffer_compression = 2; +} diff --git a/google/cloud/bigquery_storage_v1/proto/avro.proto b/google/cloud/bigquery_storage_v1/proto/avro.proto index 9a064447..dee4a6ed 100644 --- a/google/cloud/bigquery_storage_v1/proto/avro.proto +++ b/google/cloud/bigquery_storage_v1/proto/avro.proto @@ -1,4 +1,4 @@ -// Copyright 2019 Google LLC. +// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// syntax = "proto3"; diff --git a/google/cloud/bigquery_storage_v1/proto/storage.proto b/google/cloud/bigquery_storage_v1/proto/storage.proto index 26fcd6ac..a5fa2b9e 100644 --- a/google/cloud/bigquery_storage_v1/proto/storage.proto +++ b/google/cloud/bigquery_storage_v1/proto/storage.proto @@ -1,4 +1,4 @@ -// Copyright 2019 Google LLC. +// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// syntax = "proto3"; @@ -70,7 +69,8 @@ service BigQueryRead { post: "/v1/{read_session.table=projects/*/datasets/*/tables/*}" body: "*" }; - option (google.api.method_signature) = "parent,read_session,max_stream_count"; + option (google.api.method_signature) = + "parent,read_session,max_stream_count"; } // Reads rows from the stream in the format prescribed by the ReadSession. @@ -99,7 +99,8 @@ service BigQueryRead { // original, primary, and residual, that original[0-j] = primary[0-j] and // original[j-n] = residual[0-m] once the streams have been read to // completion. - rpc SplitReadStream(SplitReadStreamRequest) returns (SplitReadStreamResponse) { + rpc SplitReadStream(SplitReadStreamRequest) + returns (SplitReadStreamResponse) { option (google.api.http) = { get: "/v1/{name=projects/*/locations/*/sessions/*/streams/*}" }; @@ -201,6 +202,19 @@ message ReadRowsResponse { // Throttling state. If unset, the latest response still describes // the current throttling status. ThrottleState throttle_state = 5; + + // The schema for the read. If read_options.selected_fields is set, the + // schema may be different from the table schema as it will only contain + // the selected fields. This schema is equivelant to the one returned by + // CreateSession. This field is only populated in the first ReadRowsResponse + // RPC. + oneof schema { + // Output only. Avro schema. + AvroSchema avro_schema = 7 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. Arrow schema. + ArrowSchema arrow_schema = 8 [(google.api.field_behavior) = OUTPUT_ONLY]; + } } // Request message for `SplitReadStream`. diff --git a/google/cloud/bigquery_storage_v1/proto/stream.proto b/google/cloud/bigquery_storage_v1/proto/stream.proto index febad036..28b2ac1b 100644 --- a/google/cloud/bigquery_storage_v1/proto/stream.proto +++ b/google/cloud/bigquery_storage_v1/proto/stream.proto @@ -1,4 +1,4 @@ -// Copyright 2019 Google LLC. +// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// syntax = "proto3"; @@ -72,17 +71,27 @@ message ReadSession { // "nullable_field is not NULL" // "st_equals(geo_field, st_geofromtext("POINT(2, 2)"))" // "numeric_field BETWEEN 1.0 AND 5.0" + // + // Restricted to a maximum length for 1 MB. string row_restriction = 2; + + // Optional. Options specific to the Apache Arrow output format. + oneof output_format_serialization_options { + ArrowSerializationOptions arrow_serialization_options = 3 + [(google.api.field_behavior) = OPTIONAL]; + } } // Output only. Unique identifier for the session, in the form // `projects/{project_id}/locations/{location}/sessions/{session_id}`. string name = 1 [(google.api.field_behavior) = OUTPUT_ONLY]; - // Output only. Time at which the session becomes invalid. After this time, subsequent - // requests to read this Session will return errors. The expire_time is - // automatically assigned and currently cannot be specified or updated. - google.protobuf.Timestamp expire_time = 2 [(google.api.field_behavior) = OUTPUT_ONLY]; + // Output only. Time at which the session becomes invalid. After this time, + // subsequent requests to read this Session will return errors. The + // expire_time is automatically assigned and currently cannot be specified or + // updated. + google.protobuf.Timestamp expire_time = 2 + [(google.api.field_behavior) = OUTPUT_ONLY]; // Immutable. Data format of the output data. DataFormat data_format = 3 [(google.api.field_behavior) = IMMUTABLE]; @@ -102,12 +111,11 @@ message ReadSession { // `projects/{project_id}/datasets/{dataset_id}/tables/{table_id}` string table = 6 [ (google.api.field_behavior) = IMMUTABLE, - (google.api.resource_reference) = { - type: "bigquery.googleapis.com/Table" - } + (google.api.resource_reference) = { type: "bigquery.googleapis.com/Table" } ]; - // Optional. Any modifiers which are applied when reading from the specified table. + // Optional. Any modifiers which are applied when reading from the specified + // table. TableModifiers table_modifiers = 7 [(google.api.field_behavior) = OPTIONAL]; // Optional. Read options for this session (e.g. column selection, filters). diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/async_client.py b/google/cloud/bigquery_storage_v1/services/big_query_read/async_client.py index 5363e60f..c8c28dab 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/async_client.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/async_client.py @@ -81,8 +81,36 @@ class BigQueryReadAsyncClient: BigQueryReadClient.parse_common_location_path ) - from_service_account_info = BigQueryReadClient.from_service_account_info - from_service_account_file = BigQueryReadClient.from_service_account_file + @classmethod + def from_service_account_info(cls, info: dict, *args, **kwargs): + """Creates an instance of this client using the provided credentials info. + + Args: + info (dict): The service account private key info. + args: Additional arguments to pass to the constructor. + kwargs: Additional arguments to pass to the constructor. + + Returns: + BigQueryReadAsyncClient: The constructed client. + """ + return BigQueryReadClient.from_service_account_info.__func__(BigQueryReadAsyncClient, info, *args, **kwargs) # type: ignore + + @classmethod + def from_service_account_file(cls, filename: str, *args, **kwargs): + """Creates an instance of this client using the provided credentials + file. + + Args: + filename (str): The path to the service account private key json + file. + args: Additional arguments to pass to the constructor. + kwargs: Additional arguments to pass to the constructor. + + Returns: + BigQueryReadAsyncClient: The constructed client. + """ + return BigQueryReadClient.from_service_account_file.__func__(BigQueryReadAsyncClient, filename, *args, **kwargs) # type: ignore + from_service_account_json = from_service_account_file @property @@ -260,6 +288,7 @@ async def create_read_session( predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=DEFAULT_CLIENT_INFO, @@ -358,6 +387,7 @@ def read_rows( maximum=60.0, multiplier=1.3, predicate=retries.if_exception_type(exceptions.ServiceUnavailable,), + deadline=86400.0, ), default_timeout=86400.0, client_info=DEFAULT_CLIENT_INFO, @@ -429,6 +459,7 @@ async def split_read_stream( predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=DEFAULT_CLIENT_INFO, diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/base.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/base.py index 4497158f..991260bd 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/base.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/base.py @@ -74,10 +74,10 @@ def __init__( scope (Optional[Sequence[str]]): A list of scopes. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. """ # Save the hostname. Default to port 443 (HTTPS) if none is specified. @@ -85,6 +85,9 @@ def __init__( host += ":443" self._host = host + # Save the scopes. + self._scopes = scopes or self.AUTH_SCOPES + # If no credentials are provided, then determine the appropriate # defaults. if credentials and credentials_file: @@ -94,20 +97,17 @@ def __init__( if credentials_file is not None: credentials, _ = auth.load_credentials_from_file( - credentials_file, scopes=scopes, quota_project_id=quota_project_id + credentials_file, scopes=self._scopes, quota_project_id=quota_project_id ) elif credentials is None: credentials, _ = auth.default( - scopes=scopes, quota_project_id=quota_project_id + scopes=self._scopes, quota_project_id=quota_project_id ) # Save the credentials. self._credentials = credentials - # Lifted into its own function so it can be stubbed out during tests. - self._prep_wrapped_messages(client_info) - def _prep_wrapped_messages(self, client_info): # Precompute the wrapped methods. self._wrapped_methods = { @@ -120,6 +120,7 @@ def _prep_wrapped_messages(self, client_info): predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=client_info, @@ -131,6 +132,7 @@ def _prep_wrapped_messages(self, client_info): maximum=60.0, multiplier=1.3, predicate=retries.if_exception_type(exceptions.ServiceUnavailable,), + deadline=86400.0, ), default_timeout=86400.0, client_info=client_info, @@ -144,6 +146,7 @@ def _prep_wrapped_messages(self, client_info): predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=client_info, diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py index cfb3dbf8..4c7369e9 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py @@ -108,7 +108,9 @@ def __init__( google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` and ``credentials_file`` are passed. """ + self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials + self._stubs: Dict[str, Callable] = {} if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -116,70 +118,50 @@ def __init__( warnings.warn("client_cert_source is deprecated", DeprecationWarning) if channel: - # Sanity check: Ensure that channel and credentials are not both - # provided. + # Ignore credentials if a channel was passed. credentials = False - # If a channel was explicitly provided, set it. self._grpc_channel = channel self._ssl_channel_credentials = None - elif api_mtls_endpoint: - host = ( - api_mtls_endpoint - if ":" in api_mtls_endpoint - else api_mtls_endpoint + ":443" - ) - - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) - - # Create SSL credentials with client_cert_source or application - # default SSL credentials. - if client_cert_source: - cert, key = client_cert_source() - ssl_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) - else: - ssl_credentials = SslCredentials().ssl_credentials - # create a new channel. The provided one is ignored. - self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, - credentials_file=credentials_file, - ssl_credentials=ssl_credentials, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - self._ssl_channel_credentials = ssl_credentials else: - host = host if ":" in host else host + ":443" + if api_mtls_endpoint: + host = api_mtls_endpoint + + # Create SSL credentials with client_cert_source or application + # default SSL credentials. + if client_cert_source: + cert, key = client_cert_source() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) + else: + self._ssl_channel_credentials = SslCredentials().ssl_credentials - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) + else: + if client_cert_source_for_mtls and not ssl_channel_credentials: + cert, key = client_cert_source_for_mtls() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) - if client_cert_source_for_mtls and not ssl_channel_credentials: - cert, key = client_cert_source_for_mtls() - self._ssl_channel_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) + # The base transport sets the host, credentials and scopes + super().__init__( + host=host, + credentials=credentials, + credentials_file=credentials_file, + scopes=scopes, + quota_project_id=quota_project_id, + client_info=client_info, + ) - # create a new channel. The provided one is ignored. + if not self._grpc_channel: self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, + self._host, + credentials=self._credentials, credentials_file=credentials_file, + scopes=self._scopes, ssl_credentials=self._ssl_channel_credentials, - scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, options=[ ("grpc.max_send_message_length", -1), @@ -187,17 +169,8 @@ def __init__( ], ) - self._stubs = {} # type: Dict[str, Callable] - - # Run the base constructor. - super().__init__( - host=host, - credentials=credentials, - credentials_file=credentials_file, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - client_info=client_info, - ) + # Wrap messages. This must be done after self._grpc_channel exists + self._prep_wrapped_messages(client_info) @classmethod def create_channel( @@ -211,7 +184,7 @@ def create_channel( ) -> grpc.Channel: """Create and return a gRPC channel object. Args: - address (Optional[str]): The host for the channel to use. + host (Optional[str]): The host for the channel to use. credentials (Optional[~.Credentials]): The authorization credentials to attach to requests. These credentials identify this application to the service. If diff --git a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py index 919e6215..5af58e14 100644 --- a/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py +++ b/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc_asyncio.py @@ -63,7 +63,7 @@ def create_channel( ) -> aio.Channel: """Create and return a gRPC AsyncIO channel object. Args: - address (Optional[str]): The host for the channel to use. + host (Optional[str]): The host for the channel to use. credentials (Optional[~.Credentials]): The authorization credentials to attach to requests. These credentials identify this application to the service. If @@ -141,10 +141,10 @@ def __init__( ignored if ``channel`` or ``ssl_channel_credentials`` is provided. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. Raises: @@ -153,7 +153,9 @@ def __init__( google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` and ``credentials_file`` are passed. """ + self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials + self._stubs: Dict[str, Callable] = {} if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -161,70 +163,50 @@ def __init__( warnings.warn("client_cert_source is deprecated", DeprecationWarning) if channel: - # Sanity check: Ensure that channel and credentials are not both - # provided. + # Ignore credentials if a channel was passed. credentials = False - # If a channel was explicitly provided, set it. self._grpc_channel = channel self._ssl_channel_credentials = None - elif api_mtls_endpoint: - host = ( - api_mtls_endpoint - if ":" in api_mtls_endpoint - else api_mtls_endpoint + ":443" - ) - - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) - - # Create SSL credentials with client_cert_source or application - # default SSL credentials. - if client_cert_source: - cert, key = client_cert_source() - ssl_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) - else: - ssl_credentials = SslCredentials().ssl_credentials - # create a new channel. The provided one is ignored. - self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, - credentials_file=credentials_file, - ssl_credentials=ssl_credentials, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - self._ssl_channel_credentials = ssl_credentials else: - host = host if ":" in host else host + ":443" + if api_mtls_endpoint: + host = api_mtls_endpoint + + # Create SSL credentials with client_cert_source or application + # default SSL credentials. + if client_cert_source: + cert, key = client_cert_source() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) + else: + self._ssl_channel_credentials = SslCredentials().ssl_credentials - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) + else: + if client_cert_source_for_mtls and not ssl_channel_credentials: + cert, key = client_cert_source_for_mtls() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) - if client_cert_source_for_mtls and not ssl_channel_credentials: - cert, key = client_cert_source_for_mtls() - self._ssl_channel_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) + # The base transport sets the host, credentials and scopes + super().__init__( + host=host, + credentials=credentials, + credentials_file=credentials_file, + scopes=scopes, + quota_project_id=quota_project_id, + client_info=client_info, + ) - # create a new channel. The provided one is ignored. + if not self._grpc_channel: self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, + self._host, + credentials=self._credentials, credentials_file=credentials_file, + scopes=self._scopes, ssl_credentials=self._ssl_channel_credentials, - scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, options=[ ("grpc.max_send_message_length", -1), @@ -232,17 +214,8 @@ def __init__( ], ) - # Run the base constructor. - super().__init__( - host=host, - credentials=credentials, - credentials_file=credentials_file, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - client_info=client_info, - ) - - self._stubs = {} + # Wrap messages. This must be done after self._grpc_channel exists + self._prep_wrapped_messages(client_info) @property def grpc_channel(self) -> aio.Channel: diff --git a/google/cloud/bigquery_storage_v1/types/__init__.py b/google/cloud/bigquery_storage_v1/types/__init__.py index 14fc7096..1b0763f0 100644 --- a/google/cloud/bigquery_storage_v1/types/__init__.py +++ b/google/cloud/bigquery_storage_v1/types/__init__.py @@ -16,43 +16,45 @@ # from .arrow import ( - ArrowSchema, ArrowRecordBatch, + ArrowSchema, + ArrowSerializationOptions, ) from .avro import ( - AvroSchema, AvroRows, -) -from .stream import ( - DataFormat, - ReadSession, - ReadStream, - DataFormat, + AvroSchema, ) from .storage import ( CreateReadSessionRequest, ReadRowsRequest, - ThrottleState, - StreamStats, ReadRowsResponse, SplitReadStreamRequest, SplitReadStreamResponse, + StreamStats, + ThrottleState, +) +from .stream import ( + DataFormat, + ReadSession, + ReadStream, + DataFormat, ) __all__ = ( - "ArrowSchema", "ArrowRecordBatch", - "AvroSchema", + "ArrowSchema", + "ArrowSerializationOptions", "AvroRows", - "DataFormat", - "ReadSession", - "ReadStream", - "DataFormat", + "AvroSchema", "CreateReadSessionRequest", "ReadRowsRequest", - "ThrottleState", - "StreamStats", "ReadRowsResponse", "SplitReadStreamRequest", "SplitReadStreamResponse", + "StreamStats", + "ThrottleState", + "DataFormat", + "ReadSession", + "ReadStream", + "DataFormat", ) diff --git a/google/cloud/bigquery_storage_v1/types/arrow.py b/google/cloud/bigquery_storage_v1/types/arrow.py index e77b4576..1fe24e45 100644 --- a/google/cloud/bigquery_storage_v1/types/arrow.py +++ b/google/cloud/bigquery_storage_v1/types/arrow.py @@ -20,7 +20,7 @@ __protobuf__ = proto.module( package="google.cloud.bigquery.storage.v1", - manifest={"ArrowSchema", "ArrowRecordBatch",}, + manifest={"ArrowSchema", "ArrowRecordBatch", "ArrowSerializationOptions",}, ) @@ -55,4 +55,21 @@ class ArrowRecordBatch(proto.Message): row_count = proto.Field(proto.INT64, number=2) +class ArrowSerializationOptions(proto.Message): + r"""Contains options specific to Arrow Serialization. + + Attributes: + buffer_compression (google.cloud.bigquery_storage_v1.types.ArrowSerializationOptions.CompressionCodec): + The compression codec to use for Arrow + buffers in serialized record batches. + """ + + class CompressionCodec(proto.Enum): + r"""Compression codec's supported by Arrow.""" + COMPRESSION_UNSPECIFIED = 0 + LZ4_FRAME = 1 + + buffer_compression = proto.Field(proto.ENUM, number=2, enum=CompressionCodec,) + + __all__ = tuple(sorted(__protobuf__.manifest)) diff --git a/google/cloud/bigquery_storage_v1/types/storage.py b/google/cloud/bigquery_storage_v1/types/storage.py index 367d3bb3..7cd7766e 100644 --- a/google/cloud/bigquery_storage_v1/types/storage.py +++ b/google/cloud/bigquery_storage_v1/types/storage.py @@ -154,6 +154,10 @@ class ReadRowsResponse(proto.Message): Throttling state. If unset, the latest response still describes the current throttling status. + avro_schema (google.cloud.bigquery_storage_v1.types.AvroSchema): + Output only. Avro schema. + arrow_schema (google.cloud.bigquery_storage_v1.types.ArrowSchema): + Output only. Arrow schema. """ avro_rows = proto.Field( @@ -170,6 +174,14 @@ class ReadRowsResponse(proto.Message): throttle_state = proto.Field(proto.MESSAGE, number=5, message="ThrottleState",) + avro_schema = proto.Field( + proto.MESSAGE, number=7, oneof="schema", message=avro.AvroSchema, + ) + + arrow_schema = proto.Field( + proto.MESSAGE, number=8, oneof="schema", message=arrow.ArrowSchema, + ) + class SplitReadStreamRequest(proto.Message): r"""Request message for ``SplitReadStream``. diff --git a/google/cloud/bigquery_storage_v1/types/stream.py b/google/cloud/bigquery_storage_v1/types/stream.py index 34b865eb..4558bedc 100644 --- a/google/cloud/bigquery_storage_v1/types/stream.py +++ b/google/cloud/bigquery_storage_v1/types/stream.py @@ -104,13 +104,24 @@ class TableReadOptions(proto.Message): Examples: "int_field > 5" "date_field = CAST('2014-9-27' as DATE)" "nullable_field is not NULL" "st_equals(geo_field, st_geofromtext("POINT(2, 2)"))" "numeric_field BETWEEN 1.0 - AND 5.0". + AND 5.0" + + Restricted to a maximum length for 1 MB. + arrow_serialization_options (google.cloud.bigquery_storage_v1.types.ArrowSerializationOptions): + """ selected_fields = proto.RepeatedField(proto.STRING, number=1) row_restriction = proto.Field(proto.STRING, number=2) + arrow_serialization_options = proto.Field( + proto.MESSAGE, + number=3, + oneof="output_format_serialization_options", + message=arrow.ArrowSerializationOptions, + ) + name = proto.Field(proto.STRING, number=1) expire_time = proto.Field(proto.MESSAGE, number=2, message=timestamp.Timestamp,) diff --git a/google/cloud/bigquery_storage_v1beta2/proto/arrow.proto b/google/cloud/bigquery_storage_v1beta2/proto/arrow.proto index bc2e4eb1..74fe927b 100644 --- a/google/cloud/bigquery_storage_v1beta2/proto/arrow.proto +++ b/google/cloud/bigquery_storage_v1beta2/proto/arrow.proto @@ -1,4 +1,4 @@ -// Copyright 2020 Google LLC +// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/google/cloud/bigquery_storage_v1beta2/proto/avro.proto b/google/cloud/bigquery_storage_v1beta2/proto/avro.proto index 109ec86a..495132ec 100644 --- a/google/cloud/bigquery_storage_v1beta2/proto/avro.proto +++ b/google/cloud/bigquery_storage_v1beta2/proto/avro.proto @@ -1,4 +1,4 @@ -// Copyright 2020 Google LLC +// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/google/cloud/bigquery_storage_v1beta2/proto/protobuf.proto b/google/cloud/bigquery_storage_v1beta2/proto/protobuf.proto index 741e7d11..11e851be 100644 --- a/google/cloud/bigquery_storage_v1beta2/proto/protobuf.proto +++ b/google/cloud/bigquery_storage_v1beta2/proto/protobuf.proto @@ -1,4 +1,4 @@ -// Copyright 2020 Google LLC +// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ option java_multiple_files = true; option java_outer_classname = "ProtoBufProto"; option java_package = "com.google.cloud.bigquery.storage.v1beta2"; -// Protobuf schema is an API presentation the proto buffer schema. +// ProtoSchema describes the schema of the serialized protocol buffer data rows. message ProtoSchema { // Descriptor for input message. The descriptor has to be self contained, // including all the nested types, excepted for proto buffer well known types @@ -31,7 +31,6 @@ message ProtoSchema { google.protobuf.DescriptorProto proto_descriptor = 1; } -// Protobuf rows. message ProtoRows { // A sequence of rows serialized as a Protocol Buffer. // diff --git a/google/cloud/bigquery_storage_v1beta2/proto/storage.proto b/google/cloud/bigquery_storage_v1beta2/proto/storage.proto index 5538e29f..8c25b846 100644 --- a/google/cloud/bigquery_storage_v1beta2/proto/storage.proto +++ b/google/cloud/bigquery_storage_v1beta2/proto/storage.proto @@ -1,4 +1,4 @@ -// Copyright 2020 Google LLC +// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -71,8 +71,7 @@ service BigQueryRead { post: "/v1beta2/{read_session.table=projects/*/datasets/*/tables/*}" body: "*" }; - option (google.api.method_signature) = - "parent,read_session,max_stream_count"; + option (google.api.method_signature) = "parent,read_session,max_stream_count"; } // Reads rows from the stream in the format prescribed by the ReadSession. @@ -101,8 +100,7 @@ service BigQueryRead { // original, primary, and residual, that original[0-j] = primary[0-j] and // original[j-n] = residual[0-m] once the streams have been read to // completion. - rpc SplitReadStream(SplitReadStreamRequest) - returns (SplitReadStreamResponse) { + rpc SplitReadStream(SplitReadStreamRequest) returns (SplitReadStreamResponse) { option (google.api.http) = { get: "/v1beta2/{name=projects/*/locations/*/sessions/*/streams/*}" }; @@ -171,8 +169,7 @@ service BigQueryWrite { // Finalize a write stream so that no new data can be appended to the // stream. Finalize is not supported on the '_default' stream. - rpc FinalizeWriteStream(FinalizeWriteStreamRequest) - returns (FinalizeWriteStreamResponse) { + rpc FinalizeWriteStream(FinalizeWriteStreamRequest) returns (FinalizeWriteStreamResponse) { option (google.api.http) = { post: "/v1beta2/{name=projects/*/datasets/*/tables/*/streams/*}" body: "*" @@ -185,8 +182,7 @@ service BigQueryWrite { // Streams must be finalized before commit and cannot be committed multiple // times. Once a stream is committed, data in the stream becomes available // for read operations. - rpc BatchCommitWriteStreams(BatchCommitWriteStreamsRequest) - returns (BatchCommitWriteStreamsResponse) { + rpc BatchCommitWriteStreams(BatchCommitWriteStreamsRequest) returns (BatchCommitWriteStreamsResponse) { option (google.api.http) = { get: "/v1beta2/{parent=projects/*/datasets/*/tables/*}" }; @@ -303,6 +299,19 @@ message ReadRowsResponse { // Throttling state. If unset, the latest response still describes // the current throttling status. ThrottleState throttle_state = 5; + + // The schema for the read. If read_options.selected_fields is set, the + // schema may be different from the table schema as it will only contain + // the selected fields. This schema is equivelant to the one returned by + // CreateSession. This field is only populated in the first ReadRowsResponse + // RPC. + oneof schema { + // Output only. Avro schema. + AvroSchema avro_schema = 7 [(google.api.field_behavior) = OUTPUT_ONLY]; + + // Output only. Arrow schema. + ArrowSchema arrow_schema = 8 [(google.api.field_behavior) = OUTPUT_ONLY]; + } } // Request message for `SplitReadStream`. @@ -342,7 +351,9 @@ message CreateWriteStreamRequest { // of `projects/{project}/datasets/{dataset}/tables/{table}`. string parent = 1 [ (google.api.field_behavior) = REQUIRED, - (google.api.resource_reference) = { type: "bigquery.googleapis.com/Table" } + (google.api.resource_reference) = { + type: "bigquery.googleapis.com/Table" + } ]; // Required. Stream to be created. @@ -360,9 +371,9 @@ message AppendRowsRequest { ProtoRows rows = 2; } - // Required. The stream that is the target of the append operation. This value - // must be specified for the initial request. If subsequent requests specify - // the stream name, it must equal to the value provided in the first request. + // Required. The stream that is the target of the append operation. This value must be + // specified for the initial request. If subsequent requests specify the + // stream name, it must equal to the value provided in the first request. // To write to the _default stream, populate this field with a string in the // format `projects/{project}/datasets/{dataset}/tables/{table}/_default`. string write_stream = 1 [ @@ -394,7 +405,7 @@ message AppendRowsRequest { // Response message for `AppendRows`. message AppendRowsResponse { - // A success append result. + // AppendResult is returned for successful append requests. message AppendResult { // The row offset at which the last append occurred. The offset will not be // set if appending using default streams. @@ -405,25 +416,32 @@ message AppendRowsResponse { // Result if the append is successful. AppendResult append_result = 1; - // Error in case of request failed. If set, it means rows are not accepted - // into the system. Users can retry or continue with other requests within - // the same connection. - // ALREADY_EXISTS: happens when offset is specified, it means the entire - // request is already appended, it is safe to ignore this error. - // OUT_OF_RANGE: happens when offset is specified, it means the specified - // offset is beyond the end of the stream. - // INVALID_ARGUMENT: error caused by malformed request or data. - // RESOURCE_EXHAUSTED: request rejected due to throttling. Only happens when - // append without offset. - // ABORTED: request processing is aborted because of prior failures, request - // can be retried if previous failure is fixed. - // INTERNAL: server side errors that can be retried. + // Error returned when problems were encountered. If present, + // it indicates rows were not accepted into the system. + // Users can retry or continue with other append requests within the + // same connection. + // + // Additional information about error signalling: + // + // ALREADY_EXISTS: Happens when an append specified an offset, and the + // backend already has received data at this offset. Typically encountered + // in retry scenarios, and can be ignored. + // + // OUT_OF_RANGE: Returned when the specified offset in the stream is beyond + // the current end of the stream. + // + // INVALID_ARGUMENT: Indicates a malformed request or data. + // + // ABORTED: Request processing is aborted because of prior failures. The + // request can be retried if previous failure is addressed. + // + // INTERNAL: Indicates server side error(s) that can be retried. google.rpc.Status error = 2; } // If backend detects a schema update, pass it to user so that user can - // use it to input new type of message. It will be empty when there is no - // schema updates. + // use it to input new type of message. It will be empty when no schema + // updates have occurred. TableSchema updated_schema = 3; } @@ -441,9 +459,11 @@ message GetWriteStreamRequest { // Request message for `BatchCommitWriteStreams`. message BatchCommitWriteStreamsRequest { - // Required. Parent table that all the streams should belong to, in the form - // of `projects/{project}/datasets/{dataset}/tables/{table}`. - string parent = 1 [(google.api.field_behavior) = REQUIRED]; + // Required. Parent table that all the streams should belong to, in the form of + // `projects/{project}/datasets/{dataset}/tables/{table}`. + string parent = 1 [ + (google.api.field_behavior) = REQUIRED + ]; // Required. The group of streams that will be committed atomically. repeated string write_streams = 2 [(google.api.field_behavior) = REQUIRED]; @@ -452,11 +472,15 @@ message BatchCommitWriteStreamsRequest { // Response message for `BatchCommitWriteStreams`. message BatchCommitWriteStreamsResponse { // The time at which streams were committed in microseconds granularity. - // This field will only exist when there is no stream errors. + // This field will only exist when there are no stream errors. + // **Note** if this field is not set, it means the commit was not successful. google.protobuf.Timestamp commit_time = 1; // Stream level error if commit failed. Only streams with error will be in // the list. + // If empty, there is no error and all streams are committed successfully. + // If non empty, certain streams have errors and ZERO stream is committed due + // to atomicity guarantee. repeated StorageError stream_errors = 2; } @@ -500,8 +524,9 @@ message FlushRowsResponse { } // Structured custom BigQuery Storage error message. The error can be attached -// as error details in the returned rpc Status. User can use the info to process -// errors in a structural way, rather than having to parse error messages. +// as error details in the returned rpc Status. In particular, the use of error +// codes allows more structured error handling, and reduces the need to evaluate +// unstructured error text strings. message StorageError { // Error code for `StorageError`. enum StorageErrorCode { @@ -522,9 +547,12 @@ message StorageError { INVALID_STREAM_TYPE = 4; // Invalid Stream state. - // For example, you try to commit a stream that is not fianlized or is + // For example, you try to commit a stream that is not finalized or is // garbaged. INVALID_STREAM_STATE = 5; + + // Stream is finalized. + STREAM_FINALIZED = 6; } // BigQuery Storage specific error code. diff --git a/google/cloud/bigquery_storage_v1beta2/proto/stream.proto b/google/cloud/bigquery_storage_v1beta2/proto/stream.proto index 2b0a58c9..d166e987 100644 --- a/google/cloud/bigquery_storage_v1beta2/proto/stream.proto +++ b/google/cloud/bigquery_storage_v1beta2/proto/stream.proto @@ -1,4 +1,4 @@ -// Copyright 2020 Google LLC +// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -74,6 +74,8 @@ message ReadSession { // "nullable_field is not NULL" // "st_equals(geo_field, st_geofromtext("POINT(2, 2)"))" // "numeric_field BETWEEN 1.0 AND 5.0" + // + // Restricted to a maximum length for 1 MB. string row_restriction = 2; // Optional. Options specific to the Apache Arrow output format. diff --git a/google/cloud/bigquery_storage_v1beta2/proto/table.proto b/google/cloud/bigquery_storage_v1beta2/proto/table.proto index fd8a0a75..670a4a64 100644 --- a/google/cloud/bigquery_storage_v1beta2/proto/table.proto +++ b/google/cloud/bigquery_storage_v1beta2/proto/table.proto @@ -1,4 +1,4 @@ -// Copyright 2020 Google LLC +// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -70,6 +70,12 @@ message TableFieldSchema { // Numeric value NUMERIC = 12; + + // BigNumeric value + BIGNUMERIC = 13; + + // Interval + INTERVAL = 14; } enum Mode { diff --git a/google/cloud/bigquery_storage_v1beta2/services/big_query_read/async_client.py b/google/cloud/bigquery_storage_v1beta2/services/big_query_read/async_client.py index 69b6ebe2..7fc8f127 100644 --- a/google/cloud/bigquery_storage_v1beta2/services/big_query_read/async_client.py +++ b/google/cloud/bigquery_storage_v1beta2/services/big_query_read/async_client.py @@ -83,8 +83,36 @@ class BigQueryReadAsyncClient: BigQueryReadClient.parse_common_location_path ) - from_service_account_info = BigQueryReadClient.from_service_account_info - from_service_account_file = BigQueryReadClient.from_service_account_file + @classmethod + def from_service_account_info(cls, info: dict, *args, **kwargs): + """Creates an instance of this client using the provided credentials info. + + Args: + info (dict): The service account private key info. + args: Additional arguments to pass to the constructor. + kwargs: Additional arguments to pass to the constructor. + + Returns: + BigQueryReadAsyncClient: The constructed client. + """ + return BigQueryReadClient.from_service_account_info.__func__(BigQueryReadAsyncClient, info, *args, **kwargs) # type: ignore + + @classmethod + def from_service_account_file(cls, filename: str, *args, **kwargs): + """Creates an instance of this client using the provided credentials + file. + + Args: + filename (str): The path to the service account private key json + file. + args: Additional arguments to pass to the constructor. + kwargs: Additional arguments to pass to the constructor. + + Returns: + BigQueryReadAsyncClient: The constructed client. + """ + return BigQueryReadClient.from_service_account_file.__func__(BigQueryReadAsyncClient, filename, *args, **kwargs) # type: ignore + from_service_account_json = from_service_account_file @property @@ -262,6 +290,7 @@ async def create_read_session( predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=DEFAULT_CLIENT_INFO, @@ -360,6 +389,7 @@ def read_rows( maximum=60.0, multiplier=1.3, predicate=retries.if_exception_type(exceptions.ServiceUnavailable,), + deadline=86400.0, ), default_timeout=86400.0, client_info=DEFAULT_CLIENT_INFO, @@ -431,6 +461,7 @@ async def split_read_stream( predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=DEFAULT_CLIENT_INFO, diff --git a/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/base.py b/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/base.py index 72f43ab7..5862d1f9 100644 --- a/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/base.py +++ b/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/base.py @@ -74,10 +74,10 @@ def __init__( scope (Optional[Sequence[str]]): A list of scopes. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. """ # Save the hostname. Default to port 443 (HTTPS) if none is specified. @@ -85,6 +85,9 @@ def __init__( host += ":443" self._host = host + # Save the scopes. + self._scopes = scopes or self.AUTH_SCOPES + # If no credentials are provided, then determine the appropriate # defaults. if credentials and credentials_file: @@ -94,20 +97,17 @@ def __init__( if credentials_file is not None: credentials, _ = auth.load_credentials_from_file( - credentials_file, scopes=scopes, quota_project_id=quota_project_id + credentials_file, scopes=self._scopes, quota_project_id=quota_project_id ) elif credentials is None: credentials, _ = auth.default( - scopes=scopes, quota_project_id=quota_project_id + scopes=self._scopes, quota_project_id=quota_project_id ) # Save the credentials. self._credentials = credentials - # Lifted into its own function so it can be stubbed out during tests. - self._prep_wrapped_messages(client_info) - def _prep_wrapped_messages(self, client_info): # Precompute the wrapped methods. self._wrapped_methods = { @@ -120,6 +120,7 @@ def _prep_wrapped_messages(self, client_info): predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=client_info, @@ -131,6 +132,7 @@ def _prep_wrapped_messages(self, client_info): maximum=60.0, multiplier=1.3, predicate=retries.if_exception_type(exceptions.ServiceUnavailable,), + deadline=86400.0, ), default_timeout=86400.0, client_info=client_info, @@ -144,6 +146,7 @@ def _prep_wrapped_messages(self, client_info): predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=client_info, diff --git a/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/grpc.py b/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/grpc.py index 2425d22f..dc2a433b 100644 --- a/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/grpc.py +++ b/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/grpc.py @@ -110,7 +110,9 @@ def __init__( google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` and ``credentials_file`` are passed. """ + self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials + self._stubs: Dict[str, Callable] = {} if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -118,70 +120,50 @@ def __init__( warnings.warn("client_cert_source is deprecated", DeprecationWarning) if channel: - # Sanity check: Ensure that channel and credentials are not both - # provided. + # Ignore credentials if a channel was passed. credentials = False - # If a channel was explicitly provided, set it. self._grpc_channel = channel self._ssl_channel_credentials = None - elif api_mtls_endpoint: - host = ( - api_mtls_endpoint - if ":" in api_mtls_endpoint - else api_mtls_endpoint + ":443" - ) - - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) - - # Create SSL credentials with client_cert_source or application - # default SSL credentials. - if client_cert_source: - cert, key = client_cert_source() - ssl_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) - else: - ssl_credentials = SslCredentials().ssl_credentials - # create a new channel. The provided one is ignored. - self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, - credentials_file=credentials_file, - ssl_credentials=ssl_credentials, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - self._ssl_channel_credentials = ssl_credentials else: - host = host if ":" in host else host + ":443" + if api_mtls_endpoint: + host = api_mtls_endpoint + + # Create SSL credentials with client_cert_source or application + # default SSL credentials. + if client_cert_source: + cert, key = client_cert_source() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) + else: + self._ssl_channel_credentials = SslCredentials().ssl_credentials - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) + else: + if client_cert_source_for_mtls and not ssl_channel_credentials: + cert, key = client_cert_source_for_mtls() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) - if client_cert_source_for_mtls and not ssl_channel_credentials: - cert, key = client_cert_source_for_mtls() - self._ssl_channel_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) + # The base transport sets the host, credentials and scopes + super().__init__( + host=host, + credentials=credentials, + credentials_file=credentials_file, + scopes=scopes, + quota_project_id=quota_project_id, + client_info=client_info, + ) - # create a new channel. The provided one is ignored. + if not self._grpc_channel: self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, + self._host, + credentials=self._credentials, credentials_file=credentials_file, + scopes=self._scopes, ssl_credentials=self._ssl_channel_credentials, - scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, options=[ ("grpc.max_send_message_length", -1), @@ -189,17 +171,8 @@ def __init__( ], ) - self._stubs = {} # type: Dict[str, Callable] - - # Run the base constructor. - super().__init__( - host=host, - credentials=credentials, - credentials_file=credentials_file, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - client_info=client_info, - ) + # Wrap messages. This must be done after self._grpc_channel exists + self._prep_wrapped_messages(client_info) @classmethod def create_channel( @@ -213,7 +186,7 @@ def create_channel( ) -> grpc.Channel: """Create and return a gRPC channel object. Args: - address (Optional[str]): The host for the channel to use. + host (Optional[str]): The host for the channel to use. credentials (Optional[~.Credentials]): The authorization credentials to attach to requests. These credentials identify this application to the service. If diff --git a/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/grpc_asyncio.py b/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/grpc_asyncio.py index f8021d05..0691a5aa 100644 --- a/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/grpc_asyncio.py +++ b/google/cloud/bigquery_storage_v1beta2/services/big_query_read/transports/grpc_asyncio.py @@ -65,7 +65,7 @@ def create_channel( ) -> aio.Channel: """Create and return a gRPC AsyncIO channel object. Args: - address (Optional[str]): The host for the channel to use. + host (Optional[str]): The host for the channel to use. credentials (Optional[~.Credentials]): The authorization credentials to attach to requests. These credentials identify this application to the service. If @@ -143,10 +143,10 @@ def __init__( ignored if ``channel`` or ``ssl_channel_credentials`` is provided. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. Raises: @@ -155,7 +155,9 @@ def __init__( google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` and ``credentials_file`` are passed. """ + self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials + self._stubs: Dict[str, Callable] = {} if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -163,70 +165,50 @@ def __init__( warnings.warn("client_cert_source is deprecated", DeprecationWarning) if channel: - # Sanity check: Ensure that channel and credentials are not both - # provided. + # Ignore credentials if a channel was passed. credentials = False - # If a channel was explicitly provided, set it. self._grpc_channel = channel self._ssl_channel_credentials = None - elif api_mtls_endpoint: - host = ( - api_mtls_endpoint - if ":" in api_mtls_endpoint - else api_mtls_endpoint + ":443" - ) - - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) - - # Create SSL credentials with client_cert_source or application - # default SSL credentials. - if client_cert_source: - cert, key = client_cert_source() - ssl_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) - else: - ssl_credentials = SslCredentials().ssl_credentials - # create a new channel. The provided one is ignored. - self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, - credentials_file=credentials_file, - ssl_credentials=ssl_credentials, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - self._ssl_channel_credentials = ssl_credentials else: - host = host if ":" in host else host + ":443" + if api_mtls_endpoint: + host = api_mtls_endpoint + + # Create SSL credentials with client_cert_source or application + # default SSL credentials. + if client_cert_source: + cert, key = client_cert_source() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) + else: + self._ssl_channel_credentials = SslCredentials().ssl_credentials - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) + else: + if client_cert_source_for_mtls and not ssl_channel_credentials: + cert, key = client_cert_source_for_mtls() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) - if client_cert_source_for_mtls and not ssl_channel_credentials: - cert, key = client_cert_source_for_mtls() - self._ssl_channel_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) + # The base transport sets the host, credentials and scopes + super().__init__( + host=host, + credentials=credentials, + credentials_file=credentials_file, + scopes=scopes, + quota_project_id=quota_project_id, + client_info=client_info, + ) - # create a new channel. The provided one is ignored. + if not self._grpc_channel: self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, + self._host, + credentials=self._credentials, credentials_file=credentials_file, + scopes=self._scopes, ssl_credentials=self._ssl_channel_credentials, - scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, options=[ ("grpc.max_send_message_length", -1), @@ -234,17 +216,8 @@ def __init__( ], ) - # Run the base constructor. - super().__init__( - host=host, - credentials=credentials, - credentials_file=credentials_file, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - client_info=client_info, - ) - - self._stubs = {} + # Wrap messages. This must be done after self._grpc_channel exists + self._prep_wrapped_messages(client_info) @property def grpc_channel(self) -> aio.Channel: diff --git a/google/cloud/bigquery_storage_v1beta2/services/big_query_write/async_client.py b/google/cloud/bigquery_storage_v1beta2/services/big_query_write/async_client.py index 378bd9ef..ab60a9a7 100644 --- a/google/cloud/bigquery_storage_v1beta2/services/big_query_write/async_client.py +++ b/google/cloud/bigquery_storage_v1beta2/services/big_query_write/async_client.py @@ -92,8 +92,36 @@ class BigQueryWriteAsyncClient: BigQueryWriteClient.parse_common_location_path ) - from_service_account_info = BigQueryWriteClient.from_service_account_info - from_service_account_file = BigQueryWriteClient.from_service_account_file + @classmethod + def from_service_account_info(cls, info: dict, *args, **kwargs): + """Creates an instance of this client using the provided credentials info. + + Args: + info (dict): The service account private key info. + args: Additional arguments to pass to the constructor. + kwargs: Additional arguments to pass to the constructor. + + Returns: + BigQueryWriteAsyncClient: The constructed client. + """ + return BigQueryWriteClient.from_service_account_info.__func__(BigQueryWriteAsyncClient, info, *args, **kwargs) # type: ignore + + @classmethod + def from_service_account_file(cls, filename: str, *args, **kwargs): + """Creates an instance of this client using the provided credentials + file. + + Args: + filename (str): The path to the service account private key json + file. + args: Additional arguments to pass to the constructor. + kwargs: Additional arguments to pass to the constructor. + + Returns: + BigQueryWriteAsyncClient: The constructed client. + """ + return BigQueryWriteClient.from_service_account_file.__func__(BigQueryWriteAsyncClient, filename, *args, **kwargs) # type: ignore + from_service_account_json = from_service_account_file @property @@ -239,6 +267,7 @@ async def create_write_stream( exceptions.ResourceExhausted, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=DEFAULT_CLIENT_INFO, @@ -311,6 +340,7 @@ def append_rows( predicate=retries.if_exception_type( exceptions.ResourceExhausted, exceptions.ServiceUnavailable, ), + deadline=86400.0, ), default_timeout=86400.0, client_info=DEFAULT_CLIENT_INFO, @@ -391,6 +421,7 @@ async def get_write_stream( predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=DEFAULT_CLIENT_INFO, @@ -471,6 +502,7 @@ async def finalize_write_stream( predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=DEFAULT_CLIENT_INFO, @@ -555,6 +587,7 @@ async def batch_commit_write_streams( predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=DEFAULT_CLIENT_INFO, @@ -638,6 +671,7 @@ async def flush_rows( predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=DEFAULT_CLIENT_INFO, diff --git a/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/base.py b/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/base.py index 6b9b6a18..4682188d 100644 --- a/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/base.py +++ b/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/base.py @@ -74,10 +74,10 @@ def __init__( scope (Optional[Sequence[str]]): A list of scopes. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. """ # Save the hostname. Default to port 443 (HTTPS) if none is specified. @@ -85,6 +85,9 @@ def __init__( host += ":443" self._host = host + # Save the scopes. + self._scopes = scopes or self.AUTH_SCOPES + # If no credentials are provided, then determine the appropriate # defaults. if credentials and credentials_file: @@ -94,20 +97,17 @@ def __init__( if credentials_file is not None: credentials, _ = auth.load_credentials_from_file( - credentials_file, scopes=scopes, quota_project_id=quota_project_id + credentials_file, scopes=self._scopes, quota_project_id=quota_project_id ) elif credentials is None: credentials, _ = auth.default( - scopes=scopes, quota_project_id=quota_project_id + scopes=self._scopes, quota_project_id=quota_project_id ) # Save the credentials. self._credentials = credentials - # Lifted into its own function so it can be stubbed out during tests. - self._prep_wrapped_messages(client_info) - def _prep_wrapped_messages(self, client_info): # Precompute the wrapped methods. self._wrapped_methods = { @@ -122,6 +122,7 @@ def _prep_wrapped_messages(self, client_info): exceptions.ResourceExhausted, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=client_info, @@ -135,6 +136,7 @@ def _prep_wrapped_messages(self, client_info): predicate=retries.if_exception_type( exceptions.ResourceExhausted, exceptions.ServiceUnavailable, ), + deadline=86400.0, ), default_timeout=86400.0, client_info=client_info, @@ -148,6 +150,7 @@ def _prep_wrapped_messages(self, client_info): predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=client_info, @@ -161,6 +164,7 @@ def _prep_wrapped_messages(self, client_info): predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=client_info, @@ -174,6 +178,7 @@ def _prep_wrapped_messages(self, client_info): predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=client_info, @@ -187,6 +192,7 @@ def _prep_wrapped_messages(self, client_info): predicate=retries.if_exception_type( exceptions.DeadlineExceeded, exceptions.ServiceUnavailable, ), + deadline=600.0, ), default_timeout=600.0, client_info=client_info, diff --git a/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/grpc.py b/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/grpc.py index 626c960a..2fd1cebd 100644 --- a/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/grpc.py +++ b/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/grpc.py @@ -108,7 +108,9 @@ def __init__( google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` and ``credentials_file`` are passed. """ + self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials + self._stubs: Dict[str, Callable] = {} if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -116,70 +118,50 @@ def __init__( warnings.warn("client_cert_source is deprecated", DeprecationWarning) if channel: - # Sanity check: Ensure that channel and credentials are not both - # provided. + # Ignore credentials if a channel was passed. credentials = False - # If a channel was explicitly provided, set it. self._grpc_channel = channel self._ssl_channel_credentials = None - elif api_mtls_endpoint: - host = ( - api_mtls_endpoint - if ":" in api_mtls_endpoint - else api_mtls_endpoint + ":443" - ) - - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) - - # Create SSL credentials with client_cert_source or application - # default SSL credentials. - if client_cert_source: - cert, key = client_cert_source() - ssl_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) - else: - ssl_credentials = SslCredentials().ssl_credentials - # create a new channel. The provided one is ignored. - self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, - credentials_file=credentials_file, - ssl_credentials=ssl_credentials, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - self._ssl_channel_credentials = ssl_credentials else: - host = host if ":" in host else host + ":443" + if api_mtls_endpoint: + host = api_mtls_endpoint + + # Create SSL credentials with client_cert_source or application + # default SSL credentials. + if client_cert_source: + cert, key = client_cert_source() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) + else: + self._ssl_channel_credentials = SslCredentials().ssl_credentials - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) + else: + if client_cert_source_for_mtls and not ssl_channel_credentials: + cert, key = client_cert_source_for_mtls() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) - if client_cert_source_for_mtls and not ssl_channel_credentials: - cert, key = client_cert_source_for_mtls() - self._ssl_channel_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) + # The base transport sets the host, credentials and scopes + super().__init__( + host=host, + credentials=credentials, + credentials_file=credentials_file, + scopes=scopes, + quota_project_id=quota_project_id, + client_info=client_info, + ) - # create a new channel. The provided one is ignored. + if not self._grpc_channel: self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, + self._host, + credentials=self._credentials, credentials_file=credentials_file, + scopes=self._scopes, ssl_credentials=self._ssl_channel_credentials, - scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, options=[ ("grpc.max_send_message_length", -1), @@ -187,17 +169,8 @@ def __init__( ], ) - self._stubs = {} # type: Dict[str, Callable] - - # Run the base constructor. - super().__init__( - host=host, - credentials=credentials, - credentials_file=credentials_file, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - client_info=client_info, - ) + # Wrap messages. This must be done after self._grpc_channel exists + self._prep_wrapped_messages(client_info) @classmethod def create_channel( @@ -211,7 +184,7 @@ def create_channel( ) -> grpc.Channel: """Create and return a gRPC channel object. Args: - address (Optional[str]): The host for the channel to use. + host (Optional[str]): The host for the channel to use. credentials (Optional[~.Credentials]): The authorization credentials to attach to requests. These credentials identify this application to the service. If diff --git a/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/grpc_asyncio.py b/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/grpc_asyncio.py index a053b100..890f7572 100644 --- a/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/grpc_asyncio.py +++ b/google/cloud/bigquery_storage_v1beta2/services/big_query_write/transports/grpc_asyncio.py @@ -63,7 +63,7 @@ def create_channel( ) -> aio.Channel: """Create and return a gRPC AsyncIO channel object. Args: - address (Optional[str]): The host for the channel to use. + host (Optional[str]): The host for the channel to use. credentials (Optional[~.Credentials]): The authorization credentials to attach to requests. These credentials identify this application to the service. If @@ -141,10 +141,10 @@ def __init__( ignored if ``channel`` or ``ssl_channel_credentials`` is provided. quota_project_id (Optional[str]): An optional project to use for billing and quota. - client_info (google.api_core.gapic_v1.client_info.ClientInfo): - The client info used to send a user-agent string along with - API requests. If ``None``, then default info will be used. - Generally, you only need to set this if you're developing + client_info (google.api_core.gapic_v1.client_info.ClientInfo): + The client info used to send a user-agent string along with + API requests. If ``None``, then default info will be used. + Generally, you only need to set this if you're developing your own client library. Raises: @@ -153,7 +153,9 @@ def __init__( google.api_core.exceptions.DuplicateCredentialArgs: If both ``credentials`` and ``credentials_file`` are passed. """ + self._grpc_channel = None self._ssl_channel_credentials = ssl_channel_credentials + self._stubs: Dict[str, Callable] = {} if api_mtls_endpoint: warnings.warn("api_mtls_endpoint is deprecated", DeprecationWarning) @@ -161,70 +163,50 @@ def __init__( warnings.warn("client_cert_source is deprecated", DeprecationWarning) if channel: - # Sanity check: Ensure that channel and credentials are not both - # provided. + # Ignore credentials if a channel was passed. credentials = False - # If a channel was explicitly provided, set it. self._grpc_channel = channel self._ssl_channel_credentials = None - elif api_mtls_endpoint: - host = ( - api_mtls_endpoint - if ":" in api_mtls_endpoint - else api_mtls_endpoint + ":443" - ) - - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) - - # Create SSL credentials with client_cert_source or application - # default SSL credentials. - if client_cert_source: - cert, key = client_cert_source() - ssl_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) - else: - ssl_credentials = SslCredentials().ssl_credentials - # create a new channel. The provided one is ignored. - self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, - credentials_file=credentials_file, - ssl_credentials=ssl_credentials, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - options=[ - ("grpc.max_send_message_length", -1), - ("grpc.max_receive_message_length", -1), - ], - ) - self._ssl_channel_credentials = ssl_credentials else: - host = host if ":" in host else host + ":443" + if api_mtls_endpoint: + host = api_mtls_endpoint + + # Create SSL credentials with client_cert_source or application + # default SSL credentials. + if client_cert_source: + cert, key = client_cert_source() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) + else: + self._ssl_channel_credentials = SslCredentials().ssl_credentials - if credentials is None: - credentials, _ = auth.default( - scopes=self.AUTH_SCOPES, quota_project_id=quota_project_id - ) + else: + if client_cert_source_for_mtls and not ssl_channel_credentials: + cert, key = client_cert_source_for_mtls() + self._ssl_channel_credentials = grpc.ssl_channel_credentials( + certificate_chain=cert, private_key=key + ) - if client_cert_source_for_mtls and not ssl_channel_credentials: - cert, key = client_cert_source_for_mtls() - self._ssl_channel_credentials = grpc.ssl_channel_credentials( - certificate_chain=cert, private_key=key - ) + # The base transport sets the host, credentials and scopes + super().__init__( + host=host, + credentials=credentials, + credentials_file=credentials_file, + scopes=scopes, + quota_project_id=quota_project_id, + client_info=client_info, + ) - # create a new channel. The provided one is ignored. + if not self._grpc_channel: self._grpc_channel = type(self).create_channel( - host, - credentials=credentials, + self._host, + credentials=self._credentials, credentials_file=credentials_file, + scopes=self._scopes, ssl_credentials=self._ssl_channel_credentials, - scopes=scopes or self.AUTH_SCOPES, quota_project_id=quota_project_id, options=[ ("grpc.max_send_message_length", -1), @@ -232,17 +214,8 @@ def __init__( ], ) - # Run the base constructor. - super().__init__( - host=host, - credentials=credentials, - credentials_file=credentials_file, - scopes=scopes or self.AUTH_SCOPES, - quota_project_id=quota_project_id, - client_info=client_info, - ) - - self._stubs = {} + # Wrap messages. This must be done after self._grpc_channel exists + self._prep_wrapped_messages(client_info) @property def grpc_channel(self) -> aio.Channel: diff --git a/google/cloud/bigquery_storage_v1beta2/types/__init__.py b/google/cloud/bigquery_storage_v1beta2/types/__init__.py index ba1cac28..359fc964 100644 --- a/google/cloud/bigquery_storage_v1beta2/types/__init__.py +++ b/google/cloud/bigquery_storage_v1beta2/types/__init__.py @@ -16,81 +16,81 @@ # from .arrow import ( - ArrowSchema, ArrowRecordBatch, + ArrowSchema, ArrowSerializationOptions, ) from .avro import ( - AvroSchema, AvroRows, + AvroSchema, ) from .protobuf import ( - ProtoSchema, ProtoRows, -) -from .table import ( - TableSchema, - TableFieldSchema, -) -from .stream import ( - DataFormat, - ReadSession, - ReadStream, - WriteStream, - DataFormat, + ProtoSchema, ) from .storage import ( - CreateReadSessionRequest, - ReadRowsRequest, - ThrottleState, - StreamStats, - ReadRowsResponse, - SplitReadStreamRequest, - SplitReadStreamResponse, - CreateWriteStreamRequest, AppendRowsRequest, AppendRowsResponse, - GetWriteStreamRequest, BatchCommitWriteStreamsRequest, BatchCommitWriteStreamsResponse, + CreateReadSessionRequest, + CreateWriteStreamRequest, FinalizeWriteStreamRequest, FinalizeWriteStreamResponse, FlushRowsRequest, FlushRowsResponse, + GetWriteStreamRequest, + ReadRowsRequest, + ReadRowsResponse, + SplitReadStreamRequest, + SplitReadStreamResponse, StorageError, + StreamStats, + ThrottleState, +) +from .stream import ( + DataFormat, + ReadSession, + ReadStream, + WriteStream, + DataFormat, +) +from .table import ( + TableFieldSchema, + TableSchema, ) __all__ = ( - "ArrowSchema", "ArrowRecordBatch", + "ArrowSchema", "ArrowSerializationOptions", - "AvroSchema", "AvroRows", - "ProtoSchema", + "AvroSchema", "ProtoRows", - "TableSchema", - "TableFieldSchema", - "DataFormat", - "ReadSession", - "ReadStream", - "WriteStream", - "DataFormat", - "CreateReadSessionRequest", - "ReadRowsRequest", - "ThrottleState", - "StreamStats", - "ReadRowsResponse", - "SplitReadStreamRequest", - "SplitReadStreamResponse", - "CreateWriteStreamRequest", + "ProtoSchema", "AppendRowsRequest", "AppendRowsResponse", - "GetWriteStreamRequest", "BatchCommitWriteStreamsRequest", "BatchCommitWriteStreamsResponse", + "CreateReadSessionRequest", + "CreateWriteStreamRequest", "FinalizeWriteStreamRequest", "FinalizeWriteStreamResponse", "FlushRowsRequest", "FlushRowsResponse", + "GetWriteStreamRequest", + "ReadRowsRequest", + "ReadRowsResponse", + "SplitReadStreamRequest", + "SplitReadStreamResponse", "StorageError", + "StreamStats", + "ThrottleState", + "DataFormat", + "ReadSession", + "ReadStream", + "WriteStream", + "DataFormat", + "TableFieldSchema", + "TableSchema", ) diff --git a/google/cloud/bigquery_storage_v1beta2/types/protobuf.py b/google/cloud/bigquery_storage_v1beta2/types/protobuf.py index 99c0543c..9b88202f 100644 --- a/google/cloud/bigquery_storage_v1beta2/types/protobuf.py +++ b/google/cloud/bigquery_storage_v1beta2/types/protobuf.py @@ -28,8 +28,8 @@ class ProtoSchema(proto.Message): - r"""Protobuf schema is an API presentation the proto buffer - schema. + r"""ProtoSchema describes the schema of the serialized protocol + buffer data rows. Attributes: proto_descriptor (google.protobuf.descriptor_pb2.DescriptorProto): @@ -47,7 +47,7 @@ class ProtoSchema(proto.Message): class ProtoRows(proto.Message): - r"""Protobuf rows. + r""" Attributes: serialized_rows (Sequence[bytes]): diff --git a/google/cloud/bigquery_storage_v1beta2/types/storage.py b/google/cloud/bigquery_storage_v1beta2/types/storage.py index 85369388..ee6d5210 100644 --- a/google/cloud/bigquery_storage_v1beta2/types/storage.py +++ b/google/cloud/bigquery_storage_v1beta2/types/storage.py @@ -170,6 +170,10 @@ class ReadRowsResponse(proto.Message): Throttling state. If unset, the latest response still describes the current throttling status. + avro_schema (google.cloud.bigquery_storage_v1beta2.types.AvroSchema): + Output only. Avro schema. + arrow_schema (google.cloud.bigquery_storage_v1beta2.types.ArrowSchema): + Output only. Arrow schema. """ avro_rows = proto.Field( @@ -186,6 +190,14 @@ class ReadRowsResponse(proto.Message): throttle_state = proto.Field(proto.MESSAGE, number=5, message="ThrottleState",) + avro_schema = proto.Field( + proto.MESSAGE, number=7, oneof="schema", message=avro.AvroSchema, + ) + + arrow_schema = proto.Field( + proto.MESSAGE, number=8, oneof="schema", message=arrow.ArrowSchema, + ) + class SplitReadStreamRequest(proto.Message): r"""Request message for ``SplitReadStream``. @@ -307,28 +319,38 @@ class AppendRowsResponse(proto.Message): append_result (google.cloud.bigquery_storage_v1beta2.types.AppendRowsResponse.AppendResult): Result if the append is successful. error (google.rpc.status_pb2.Status): - Error in case of request failed. If set, it means rows are - not accepted into the system. Users can retry or continue - with other requests within the same connection. - ALREADY_EXISTS: happens when offset is specified, it means - the entire request is already appended, it is safe to ignore - this error. OUT_OF_RANGE: happens when offset is specified, - it means the specified offset is beyond the end of the - stream. INVALID_ARGUMENT: error caused by malformed request - or data. RESOURCE_EXHAUSTED: request rejected due to - throttling. Only happens when append without offset. - ABORTED: request processing is aborted because of prior - failures, request can be retried if previous failure is - fixed. INTERNAL: server side errors that can be retried. + Error returned when problems were encountered. If present, + it indicates rows were not accepted into the system. Users + can retry or continue with other append requests within the + same connection. + + Additional information about error signalling: + + ALREADY_EXISTS: Happens when an append specified an offset, + and the backend already has received data at this offset. + Typically encountered in retry scenarios, and can be + ignored. + + OUT_OF_RANGE: Returned when the specified offset in the + stream is beyond the current end of the stream. + + INVALID_ARGUMENT: Indicates a malformed request or data. + + ABORTED: Request processing is aborted because of prior + failures. The request can be retried if previous failure is + addressed. + + INTERNAL: Indicates server side error(s) that can be + retried. updated_schema (google.cloud.bigquery_storage_v1beta2.types.TableSchema): If backend detects a schema update, pass it to user so that user can use it to input new - type of message. It will be empty when there is - no schema updates. + type of message. It will be empty when no schema + updates have occurred. """ class AppendResult(proto.Message): - r"""A success append result. + r"""AppendResult is returned for successful append requests. Attributes: offset (google.protobuf.wrappers_pb2.Int64Value): @@ -385,12 +407,17 @@ class BatchCommitWriteStreamsResponse(proto.Message): Attributes: commit_time (google.protobuf.timestamp_pb2.Timestamp): - The time at which streams were committed in - microseconds granularity. This field will only - exist when there is no stream errors. + The time at which streams were committed in microseconds + granularity. This field will only exist when there are no + stream errors. **Note** if this field is not set, it means + the commit was not successful. stream_errors (Sequence[google.cloud.bigquery_storage_v1beta2.types.StorageError]): Stream level error if commit failed. Only streams with error will be in the list. + If empty, there is no error and all streams are + committed successfully. If non empty, certain + streams have errors and ZERO stream is committed + due to atomicity guarantee. """ commit_time = proto.Field(proto.MESSAGE, number=1, message=timestamp.Timestamp,) @@ -455,9 +482,10 @@ class FlushRowsResponse(proto.Message): class StorageError(proto.Message): r"""Structured custom BigQuery Storage error message. The error - can be attached as error details in the returned rpc Status. - User can use the info to process errors in a structural way, - rather than having to parse error messages. + can be attached as error details in the returned rpc Status. In + particular, the use of error codes allows more structured error + handling, and reduces the need to evaluate unstructured error + text strings. Attributes: code (google.cloud.bigquery_storage_v1beta2.types.StorageError.StorageErrorCode): @@ -476,6 +504,7 @@ class StorageErrorCode(proto.Enum): STREAM_NOT_FOUND = 3 INVALID_STREAM_TYPE = 4 INVALID_STREAM_STATE = 5 + STREAM_FINALIZED = 6 code = proto.Field(proto.ENUM, number=1, enum=StorageErrorCode,) diff --git a/google/cloud/bigquery_storage_v1beta2/types/stream.py b/google/cloud/bigquery_storage_v1beta2/types/stream.py index 77fb444c..e635e5a6 100644 --- a/google/cloud/bigquery_storage_v1beta2/types/stream.py +++ b/google/cloud/bigquery_storage_v1beta2/types/stream.py @@ -105,7 +105,9 @@ class TableReadOptions(proto.Message): Examples: "int_field > 5" "date_field = CAST('2014-9-27' as DATE)" "nullable_field is not NULL" "st_equals(geo_field, st_geofromtext("POINT(2, 2)"))" "numeric_field BETWEEN 1.0 - AND 5.0". + AND 5.0" + + Restricted to a maximum length for 1 MB. arrow_serialization_options (google.cloud.bigquery_storage_v1beta2.types.ArrowSerializationOptions): Optional. Options specific to the Apache Arrow output format. diff --git a/google/cloud/bigquery_storage_v1beta2/types/table.py b/google/cloud/bigquery_storage_v1beta2/types/table.py index f1a209e0..f91b07b4 100644 --- a/google/cloud/bigquery_storage_v1beta2/types/table.py +++ b/google/cloud/bigquery_storage_v1beta2/types/table.py @@ -72,6 +72,8 @@ class Type(proto.Enum): DATETIME = 10 GEOGRAPHY = 11 NUMERIC = 12 + BIGNUMERIC = 13 + INTERVAL = 14 class Mode(proto.Enum): r"""""" diff --git a/noxfile.py b/noxfile.py index c61b3fd6..d775cb8a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -18,6 +18,7 @@ from __future__ import absolute_import import os +import pathlib import shutil import nox @@ -30,6 +31,8 @@ SYSTEM_TEST_PYTHON_VERSIONS = ["3.8"] UNIT_TEST_PYTHON_VERSIONS = ["3.6", "3.7", "3.8", "3.9"] +CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute() + # 'docfx' is excluded since it only needs to run in 'docs-presubmit' nox.options.sessions = [ "unit", @@ -41,6 +44,9 @@ "docs", ] +# Error if a python version is missing +nox.options.error_on_missing_interpreters = True + @nox.session(python=DEFAULT_PYTHON_VERSION) def lint(session): @@ -81,13 +87,15 @@ def lint_setup_py(session): def default(session): # Install all test dependencies, then install this package in-place. - session.install("asyncmock", "pytest-asyncio") - session.install( - "mock", "pytest", "pytest-cov", + constraints_path = str( + CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt" ) + session.install("asyncmock", "pytest-asyncio", "-c", constraints_path) - session.install("-e", ".[fastavro,pandas,pyarrow]") + session.install("mock", "pytest", "pytest-cov", "-c", constraints_path) + + session.install("-e", ".[fastavro,pandas,pyarrow]", "-c", constraints_path) # Run py.test against the unit tests. session.run( @@ -114,6 +122,9 @@ def unit(session): @nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS) def system(session): """Run the system test suite.""" + constraints_path = str( + CURRENT_DIRECTORY / "testing" / f"constraints-{session.python}.txt" + ) system_test_path = os.path.join("tests", "system.py") system_test_folder_path = os.path.join("tests", "system") @@ -123,6 +134,9 @@ def system(session): # Sanity check: Only run tests if the environment variable is set. if not os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", ""): session.skip("Credentials must be set via environment variable") + # Install pyopenssl for mTLS testing. + if os.environ.get("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") == "true": + session.install("pyopenssl") system_test_exists = os.path.exists(system_test_path) system_test_folder_exists = os.path.exists(system_test_folder_path) @@ -135,8 +149,15 @@ def system(session): # Install all test dependencies, then install this package into the # virtualenv's dist-packages. - session.install("mock", "pytest", "google-cloud-testutils", "google-cloud-bigquery") - session.install("-e", ".[fastavro,pandas,pyarrow]") + session.install( + "mock", + "pytest", + "google-cloud-testutils", + "google-cloud-bigquery", + "-c", + constraints_path, + ) + session.install("-e", ".[fastavro,pandas,pyarrow]", "-c", constraints_path) # Run py.test against the system tests. if system_test_exists: @@ -197,9 +218,9 @@ def docfx(session): """Build the docfx yaml files for this library.""" session.install("-e", ".") - # sphinx-docfx-yaml supports up to sphinx version 1.5.5. - # https://github.com/docascode/sphinx-docfx-yaml/issues/97 - session.install("sphinx==1.5.5", "alabaster", "recommonmark", "sphinx-docfx-yaml") + session.install( + "sphinx<3.0.0", "alabaster", "recommonmark", "gcp-sphinx-docfx-yaml" + ) shutil.rmtree(os.path.join("docs", "_build"), ignore_errors=True) session.run( diff --git a/renovate.json b/renovate.json index 4fa94931..f08bc22c 100644 --- a/renovate.json +++ b/renovate.json @@ -1,5 +1,6 @@ { "extends": [ "config:base", ":preserveSemverRanges" - ] + ], + "ignorePaths": [".pre-commit-config.yaml"] } diff --git a/samples/quickstart/requirements.txt b/samples/quickstart/requirements.txt index ddc998aa..9155b4b9 100644 --- a/samples/quickstart/requirements.txt +++ b/samples/quickstart/requirements.txt @@ -1,2 +1,2 @@ fastavro -google-cloud-bigquery-storage==2.2.1 +google-cloud-bigquery-storage==2.3.0 diff --git a/samples/to_dataframe/requirements.txt b/samples/to_dataframe/requirements.txt index 24a67b26..c5bae2eb 100644 --- a/samples/to_dataframe/requirements.txt +++ b/samples/to_dataframe/requirements.txt @@ -1,6 +1,6 @@ -google-auth==1.27.0 -google-cloud-bigquery-storage==2.2.1 -google-cloud-bigquery==2.8.0 +google-auth==1.28.0 +google-cloud-bigquery-storage==2.3.0 +google-cloud-bigquery==2.13.1 pyarrow==3.0.0 ipython==7.10.2; python_version > '3.0' ipython==5.9.0; python_version < '3.0' diff --git a/setup.py b/setup.py index 328177ce..8de1c6c2 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ name = "google-cloud-bigquery-storage" description = "BigQuery Storage API API client library" -version = "2.3.0" +version = "2.4.0" release_status = "Development Status :: 5 - Production/Stable" dependencies = [ "google-api-core[grpc] >= 1.22.2, < 2.0.0dev", @@ -29,7 +29,7 @@ "libcst >= 0.2.5", ] extras = { - "pandas": "pandas>=0.17.1", + "pandas": "pandas>=0.21.1", "fastavro": "fastavro>=0.21.2", "pyarrow": "pyarrow>=0.15.0", } diff --git a/synth.metadata b/synth.metadata index 76b66ecd..1eec6cb9 100644 --- a/synth.metadata +++ b/synth.metadata @@ -4,29 +4,29 @@ "git": { "name": ".", "remote": "https://github.com/googleapis/python-bigquery-storage.git", - "sha": "7ac4df6b4c484768dc66e0afdc2cb658f0bb8a3d" + "sha": "4a714f77542e8f6b7118fdd1d032ebee45c4f911" } }, { "git": { "name": "googleapis", "remote": "https://github.com/googleapis/googleapis.git", - "sha": "20712b8fe95001b312f62c6c5f33e3e3ec92cfaf", - "internalRef": "354996675" + "sha": "b1614aa0668564ec41d78b72cf776e0292ffc98c", + "internalRef": "366811078" } }, { "git": { "name": "synthtool", "remote": "https://github.com/googleapis/synthtool.git", - "sha": "d17674372e27fb8f23013935e794aa37502071aa" + "sha": "5b5bf6d519b2d658d9f2e483d9f6f3d0ba8ee6bc" } }, { "git": { "name": "synthtool", "remote": "https://github.com/googleapis/synthtool.git", - "sha": "d17674372e27fb8f23013935e794aa37502071aa" + "sha": "5b5bf6d519b2d658d9f2e483d9f6f3d0ba8ee6bc" } } ], @@ -83,16 +83,21 @@ ".kokoro/samples/lint/presubmit.cfg", ".kokoro/samples/python3.6/common.cfg", ".kokoro/samples/python3.6/continuous.cfg", + ".kokoro/samples/python3.6/periodic-head.cfg", ".kokoro/samples/python3.6/periodic.cfg", ".kokoro/samples/python3.6/presubmit.cfg", ".kokoro/samples/python3.7/common.cfg", ".kokoro/samples/python3.7/continuous.cfg", + ".kokoro/samples/python3.7/periodic-head.cfg", ".kokoro/samples/python3.7/periodic.cfg", ".kokoro/samples/python3.7/presubmit.cfg", ".kokoro/samples/python3.8/common.cfg", ".kokoro/samples/python3.8/continuous.cfg", + ".kokoro/samples/python3.8/periodic-head.cfg", ".kokoro/samples/python3.8/periodic.cfg", ".kokoro/samples/python3.8/presubmit.cfg", + ".kokoro/test-samples-against-head.sh", + ".kokoro/test-samples-impl.sh", ".kokoro/test-samples.sh", ".kokoro/trampoline.sh", ".kokoro/trampoline_v2.sh", diff --git a/synth.py b/synth.py index 896ba9b4..851d3a15 100644 --- a/synth.py +++ b/synth.py @@ -35,6 +35,7 @@ s.move( library, excludes=[ + "bigquery-storage-*-py.tar.gz", "docs/conf.py", "docs/index.rst", f"google/cloud/bigquery_storage_{version}/__init__.py", diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index 60b1b798..f9a2ecfa 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -9,5 +9,5 @@ google-api-core==1.22.2 proto-plus==1.4.0 libcst==0.2.5 fastavro==0.21.2 -pandas==0.17.1 -pyarrow==0.15.0 \ No newline at end of file +pandas==0.21.1 +pyarrow==0.15.0 diff --git a/tests/system/__init__.py b/tests/system/__init__.py new file mode 100644 index 00000000..7e1ec16e --- /dev/null +++ b/tests/system/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/system/conftest.py b/tests/system/conftest.py index dd42e736..a18777dd 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -20,8 +20,10 @@ import pytest -_TABLE_FORMAT = "projects/{}/datasets/{}/tables/{}" +from . import helpers + +_TABLE_FORMAT = "projects/{}/datasets/{}/tables/{}" _ASSETS_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "assets") @@ -88,7 +90,7 @@ def dataset(project_id, bq_client): bq_client.delete_dataset(dataset, delete_contents=True) -@pytest.fixture(scope="session") +@pytest.fixture def table(project_id, dataset, bq_client): from google.cloud import bigquery @@ -98,8 +100,10 @@ def table(project_id, dataset, bq_client): bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"), ] - table_id = "{}.{}.{}".format(project_id, dataset.dataset_id, "users") - bq_table = bigquery.Table(table_id, schema=schema) + unique_suffix = str(uuid.uuid4()).replace("-", "_") + table_id = "users_" + unique_suffix + table_id_full = f"{project_id}.{dataset.dataset_id}.{table_id}" + bq_table = bigquery.Table(table_id_full, schema=schema) created_table = bq_client.create_table(bq_table) yield created_table @@ -154,7 +158,7 @@ def all_types_table_ref(project_id, dataset, bq_client): ) yield table_ref - bq_client.delete_table(created_table) + helpers.retry_403(bq_client.delete_table)(created_table, not_found_ok=True) @pytest.fixture @@ -182,7 +186,7 @@ def ingest_partition_table_ref(project_id, dataset, bq_client): ) yield table_ref - bq_client.delete_table(created_table) + helpers.retry_403(bq_client.delete_table)(created_table, not_found_ok=True) @pytest.fixture @@ -209,29 +213,39 @@ def col_partition_table_ref(project_id, dataset, bq_client): ) yield table_ref - bq_client.delete_table(created_table) + helpers.retry_403(bq_client.delete_table)(created_table, not_found_ok=True) @pytest.fixture -def table_with_data_ref(dataset, table, bq_client): +def table_with_data_ref(project_id, dataset, bq_client): from google.cloud import bigquery + unique_suffix = str(uuid.uuid4()).replace("-", "_") + table_id = "users_" + unique_suffix + table_id_full = f"{project_id}.{dataset.dataset_id}.{table_id}" + schema = [ + bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"), + bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"), + bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"), + ] + job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.CSV job_config.skip_leading_rows = 1 - job_config.schema = table.schema + job_config.schema = schema filename = os.path.join(_ASSETS_DIR, "people_data.csv") - with open(filename, "rb") as source_file: - job = bq_client.load_table_from_file(source_file, table, job_config=job_config) + def create_table(): + with open(filename, "rb") as source_file: + job = bq_client.load_table_from_file( + source_file, table_id_full, job_config=job_config + ) + job.result() # wait for the load to complete - job.result() # wait for the load to complete + helpers.retry_403(create_table)() - table_ref = _TABLE_FORMAT.format(table.project, table.dataset_id, table.table_id) + table_ref = _TABLE_FORMAT.format(project_id, dataset.dataset_id, table_id) yield table_ref - # truncate table data - query = "DELETE FROM {}.{} WHERE 1 = 1".format(dataset.dataset_id, table.table_id) - query_job = bq_client.query(query, location="US") - query_job.result() + helpers.retry_403(bq_client.delete_table)(table_id_full, not_found_ok=True) diff --git a/tests/system/helpers.py b/tests/system/helpers.py new file mode 100644 index 00000000..8a0e862e --- /dev/null +++ b/tests/system/helpers.py @@ -0,0 +1,36 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test utilities. + +Copied from the BigQuery client library +https://github.com/googleapis/python-bigquery/blob/master/tests/system/helpers.py +""" + +import google.api_core.exceptions +import test_utils.retry + + +def _rate_limit_exceeded(forbidden): + """Predicate: pass only exceptions with 'rateLimitExceeded' as reason.""" + return any(error["reason"] == "rateLimitExceeded" for error in forbidden._errors) + + +# We need to wait to stay within the rate limits. +# The alternative outcome is a 403 Forbidden response from upstream, which +# they return instead of the more appropriate 429. +# See https://cloud.google.com/bigquery/quota-policy +retry_403 = test_utils.retry.RetryErrors( + google.api_core.exceptions.Forbidden, error_predicate=_rate_limit_exceeded, +) diff --git a/tests/system/reader/__init__.py b/tests/system/reader/__init__.py new file mode 100644 index 00000000..7e1ec16e --- /dev/null +++ b/tests/system/reader/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/system/reader/test_reader.py b/tests/system/reader/test_reader.py index d0328041..aae3f6ec 100644 --- a/tests/system/reader/test_reader.py +++ b/tests/system/reader/test_reader.py @@ -20,10 +20,14 @@ import decimal import re +from google.cloud import bigquery import pytest import pytz -from google.cloud import bigquery +from .. import helpers + + +_TABLE_FORMAT = "projects/{}/datasets/{}/tables/{}" def _to_bq_table_ref(table_name_string, partition_suffix=""): @@ -177,25 +181,40 @@ def test_column_selection_read( assert sorted(row.keys()) == ["age", "first_name"] -def test_snapshot(client_and_types, project_id, table_with_data_ref, bq_client): +@pytest.mark.parametrize("data_format", ("AVRO", "ARROW")) +def test_snapshot(client_and_types, project_id, table, bq_client, data_format): client, types = client_and_types - before_new_data = dt.datetime.now(tz=dt.timezone.utc) + + def load_json(data): + return bq_client.load_table_from_json(data, table).result() + + # load original data into the table + original_data = [ + {"first_name": "OGFoo", "last_name": "Smith", "age": 44}, + {"first_name": "OGBar", "last_name": "Jones", "age": 33}, + ] + og_job = helpers.retry_403(load_json)(original_data) + og_time = og_job.ended # load additional data into the table new_data = [ - {"first_name": "NewGuyFoo", "last_name": "Smith", "age": 46}, - {"first_name": "NewGuyBar", "last_name": "Jones", "age": 30}, + {"first_name": "NewFoo", "last_name": "Smiff", "age": 43}, + {"first_name": "NewBar", "last_name": "Jomes", "age": 34}, ] + new_job = helpers.retry_403(load_json)(new_data) + new_time = new_job.ended - destination = _to_bq_table_ref(table_with_data_ref) - bq_client.load_table_from_json(new_data, destination).result() + # Because we want our snapshot to be between when we loaded the original + # data and when the new data was loaded, take the average of the two load + # job completion times. + before_new_data = og_time + ((new_time - og_time) / 2) # read data using the timestamp before the additional data load - + table_path = _TABLE_FORMAT.format(table.project, table.dataset_id, table.table_id) read_session = types.ReadSession() - read_session.table = table_with_data_ref + read_session.table = table_path read_session.table_modifiers.snapshot_time = before_new_data - read_session.data_format = types.DataFormat.AVRO + read_session.data_format = data_format session = client.create_read_session( request={ @@ -209,10 +228,11 @@ def test_snapshot(client_and_types, project_id, table_with_data_ref, bq_client): rows = list(client.read_rows(stream).rows(session)) # verify that only the data before the timestamp was returned - assert len(rows) == 5 # all initial records + assert len(rows) == 2 # all initial records for row in rows: - assert "NewGuy" not in row["first_name"] # no new records + assert "OG" in str(row["first_name"]) + assert "New" not in str(row["first_name"]) def test_column_partitioned_table( diff --git a/tests/unit/gapic/bigquery_storage_v1/__init__.py b/tests/unit/gapic/bigquery_storage_v1/__init__.py index 8b137891..42ffdf2b 100644 --- a/tests/unit/gapic/bigquery_storage_v1/__init__.py +++ b/tests/unit/gapic/bigquery_storage_v1/__init__.py @@ -1 +1,16 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py b/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py index 96f2fd43..5f0ba847 100644 --- a/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py +++ b/tests/unit/gapic/bigquery_storage_v1/test_big_query_read.py @@ -86,15 +86,17 @@ def test__get_default_mtls_endpoint(): assert BigQueryReadClient._get_default_mtls_endpoint(non_googleapi) == non_googleapi -def test_big_query_read_client_from_service_account_info(): +@pytest.mark.parametrize("client_class", [BigQueryReadClient, BigQueryReadAsyncClient,]) +def test_big_query_read_client_from_service_account_info(client_class): creds = credentials.AnonymousCredentials() with mock.patch.object( service_account.Credentials, "from_service_account_info" ) as factory: factory.return_value = creds info = {"valid": True} - client = BigQueryReadClient.from_service_account_info(info) + client = client_class.from_service_account_info(info) assert client.transport._credentials == creds + assert isinstance(client, client_class) assert client.transport._host == "bigquerystorage.googleapis.com:443" @@ -108,9 +110,11 @@ def test_big_query_read_client_from_service_account_file(client_class): factory.return_value = creds client = client_class.from_service_account_file("dummy/file/path.json") assert client.transport._credentials == creds + assert isinstance(client, client_class) client = client_class.from_service_account_json("dummy/file/path.json") assert client.transport._credentials == creds + assert isinstance(client, client_class) assert client.transport._host == "bigquerystorage.googleapis.com:443" @@ -476,6 +480,24 @@ def test_create_read_session_from_dict(): test_create_read_session(request_type=dict) +def test_create_read_session_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryReadClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.create_read_session), "__call__" + ) as call: + client.create_read_session() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.CreateReadSessionRequest() + + @pytest.mark.asyncio async def test_create_read_session_async( transport: str = "grpc_asyncio", request_type=storage.CreateReadSessionRequest @@ -705,6 +727,22 @@ def test_read_rows_from_dict(): test_read_rows(request_type=dict) +def test_read_rows_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryReadClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object(type(client.transport.read_rows), "__call__") as call: + client.read_rows() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.ReadRowsRequest() + + @pytest.mark.asyncio async def test_read_rows_async( transport: str = "grpc_asyncio", request_type=storage.ReadRowsRequest @@ -901,6 +939,24 @@ def test_split_read_stream_from_dict(): test_split_read_stream(request_type=dict) +def test_split_read_stream_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryReadClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.split_read_stream), "__call__" + ) as call: + client.split_read_stream() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.SplitReadStreamRequest() + + @pytest.mark.asyncio async def test_split_read_stream_async( transport: str = "grpc_asyncio", request_type=storage.SplitReadStreamRequest diff --git a/tests/unit/gapic/bigquery_storage_v1beta2/__init__.py b/tests/unit/gapic/bigquery_storage_v1beta2/__init__.py index 8b137891..42ffdf2b 100644 --- a/tests/unit/gapic/bigquery_storage_v1beta2/__init__.py +++ b/tests/unit/gapic/bigquery_storage_v1beta2/__init__.py @@ -1 +1,16 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/unit/gapic/bigquery_storage_v1beta2/test_big_query_read.py b/tests/unit/gapic/bigquery_storage_v1beta2/test_big_query_read.py index 47dd3dfc..12c49412 100644 --- a/tests/unit/gapic/bigquery_storage_v1beta2/test_big_query_read.py +++ b/tests/unit/gapic/bigquery_storage_v1beta2/test_big_query_read.py @@ -88,15 +88,17 @@ def test__get_default_mtls_endpoint(): assert BigQueryReadClient._get_default_mtls_endpoint(non_googleapi) == non_googleapi -def test_big_query_read_client_from_service_account_info(): +@pytest.mark.parametrize("client_class", [BigQueryReadClient, BigQueryReadAsyncClient,]) +def test_big_query_read_client_from_service_account_info(client_class): creds = credentials.AnonymousCredentials() with mock.patch.object( service_account.Credentials, "from_service_account_info" ) as factory: factory.return_value = creds info = {"valid": True} - client = BigQueryReadClient.from_service_account_info(info) + client = client_class.from_service_account_info(info) assert client.transport._credentials == creds + assert isinstance(client, client_class) assert client.transport._host == "bigquerystorage.googleapis.com:443" @@ -110,9 +112,11 @@ def test_big_query_read_client_from_service_account_file(client_class): factory.return_value = creds client = client_class.from_service_account_file("dummy/file/path.json") assert client.transport._credentials == creds + assert isinstance(client, client_class) client = client_class.from_service_account_json("dummy/file/path.json") assert client.transport._credentials == creds + assert isinstance(client, client_class) assert client.transport._host == "bigquerystorage.googleapis.com:443" @@ -478,6 +482,24 @@ def test_create_read_session_from_dict(): test_create_read_session(request_type=dict) +def test_create_read_session_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryReadClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.create_read_session), "__call__" + ) as call: + client.create_read_session() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.CreateReadSessionRequest() + + @pytest.mark.asyncio async def test_create_read_session_async( transport: str = "grpc_asyncio", request_type=storage.CreateReadSessionRequest @@ -707,6 +729,22 @@ def test_read_rows_from_dict(): test_read_rows(request_type=dict) +def test_read_rows_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryReadClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object(type(client.transport.read_rows), "__call__") as call: + client.read_rows() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.ReadRowsRequest() + + @pytest.mark.asyncio async def test_read_rows_async( transport: str = "grpc_asyncio", request_type=storage.ReadRowsRequest @@ -903,6 +941,24 @@ def test_split_read_stream_from_dict(): test_split_read_stream(request_type=dict) +def test_split_read_stream_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryReadClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.split_read_stream), "__call__" + ) as call: + client.split_read_stream() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.SplitReadStreamRequest() + + @pytest.mark.asyncio async def test_split_read_stream_async( transport: str = "grpc_asyncio", request_type=storage.SplitReadStreamRequest diff --git a/tests/unit/gapic/bigquery_storage_v1beta2/test_big_query_write.py b/tests/unit/gapic/bigquery_storage_v1beta2/test_big_query_write.py index 8d9112b9..8cfb4aaf 100644 --- a/tests/unit/gapic/bigquery_storage_v1beta2/test_big_query_write.py +++ b/tests/unit/gapic/bigquery_storage_v1beta2/test_big_query_write.py @@ -94,15 +94,19 @@ def test__get_default_mtls_endpoint(): ) -def test_big_query_write_client_from_service_account_info(): +@pytest.mark.parametrize( + "client_class", [BigQueryWriteClient, BigQueryWriteAsyncClient,] +) +def test_big_query_write_client_from_service_account_info(client_class): creds = credentials.AnonymousCredentials() with mock.patch.object( service_account.Credentials, "from_service_account_info" ) as factory: factory.return_value = creds info = {"valid": True} - client = BigQueryWriteClient.from_service_account_info(info) + client = client_class.from_service_account_info(info) assert client.transport._credentials == creds + assert isinstance(client, client_class) assert client.transport._host == "bigquerystorage.googleapis.com:443" @@ -118,9 +122,11 @@ def test_big_query_write_client_from_service_account_file(client_class): factory.return_value = creds client = client_class.from_service_account_file("dummy/file/path.json") assert client.transport._credentials == creds + assert isinstance(client, client_class) client = client_class.from_service_account_json("dummy/file/path.json") assert client.transport._credentials == creds + assert isinstance(client, client_class) assert client.transport._host == "bigquerystorage.googleapis.com:443" @@ -487,6 +493,24 @@ def test_create_write_stream_from_dict(): test_create_write_stream(request_type=dict) +def test_create_write_stream_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryWriteClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.create_write_stream), "__call__" + ) as call: + client.create_write_stream() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.CreateWriteStreamRequest() + + @pytest.mark.asyncio async def test_create_write_stream_async( transport: str = "grpc_asyncio", request_type=storage.CreateWriteStreamRequest @@ -777,6 +801,22 @@ def test_get_write_stream_from_dict(): test_get_write_stream(request_type=dict) +def test_get_write_stream_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryWriteClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object(type(client.transport.get_write_stream), "__call__") as call: + client.get_write_stream() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.GetWriteStreamRequest() + + @pytest.mark.asyncio async def test_get_write_stream_async( transport: str = "grpc_asyncio", request_type=storage.GetWriteStreamRequest @@ -970,6 +1010,24 @@ def test_finalize_write_stream_from_dict(): test_finalize_write_stream(request_type=dict) +def test_finalize_write_stream_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryWriteClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.finalize_write_stream), "__call__" + ) as call: + client.finalize_write_stream() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.FinalizeWriteStreamRequest() + + @pytest.mark.asyncio async def test_finalize_write_stream_async( transport: str = "grpc_asyncio", request_type=storage.FinalizeWriteStreamRequest @@ -1171,6 +1229,24 @@ def test_batch_commit_write_streams_from_dict(): test_batch_commit_write_streams(request_type=dict) +def test_batch_commit_write_streams_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryWriteClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object( + type(client.transport.batch_commit_write_streams), "__call__" + ) as call: + client.batch_commit_write_streams() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.BatchCommitWriteStreamsRequest() + + @pytest.mark.asyncio async def test_batch_commit_write_streams_async( transport: str = "grpc_asyncio", request_type=storage.BatchCommitWriteStreamsRequest @@ -1368,6 +1444,22 @@ def test_flush_rows_from_dict(): test_flush_rows(request_type=dict) +def test_flush_rows_empty_call(): + # This test is a coverage failsafe to make sure that totally empty calls, + # i.e. request == None and no flattened fields passed, work. + client = BigQueryWriteClient( + credentials=credentials.AnonymousCredentials(), transport="grpc", + ) + + # Mock the actual call within the gRPC stub, and fake the request. + with mock.patch.object(type(client.transport.flush_rows), "__call__") as call: + client.flush_rows() + call.assert_called() + _, args, _ = call.mock_calls[0] + + assert args[0] == storage.FlushRowsRequest() + + @pytest.mark.asyncio async def test_flush_rows_async( transport: str = "grpc_asyncio", request_type=storage.FlushRowsRequest diff --git a/tests/unit/test_reader_v1.py b/tests/unit/test_reader_v1.py index fcefbca7..7fb8d5a4 100644 --- a/tests/unit/test_reader_v1.py +++ b/tests/unit/test_reader_v1.py @@ -427,7 +427,9 @@ def test_to_dataframe_empty_w_scalars_avro(class_under_test): expected["int_col"] = expected["int_col"].astype("int64") expected["float_col"] = expected["float_col"].astype("float64") expected["bool_col"] = expected["bool_col"].astype("bool") - expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]") + expected["ts_col"] = ( + expected["ts_col"].astype("datetime64[ns]").dt.tz_localize("UTC") + ) pandas.testing.assert_frame_equal( got.reset_index(drop=True), # reset_index to ignore row labels diff --git a/tests/unit/test_reader_v1_arrow.py b/tests/unit/test_reader_v1_arrow.py index 202e0d81..492098f5 100644 --- a/tests/unit/test_reader_v1_arrow.py +++ b/tests/unit/test_reader_v1_arrow.py @@ -249,7 +249,9 @@ def test_to_dataframe_empty_w_scalars_arrow(class_under_test): expected["int_col"] = expected["int_col"].astype("int64") expected["float_col"] = expected["float_col"].astype("float64") expected["bool_col"] = expected["bool_col"].astype("bool") - expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]") + expected["ts_col"] = ( + expected["ts_col"].astype("datetime64[ns]").dt.tz_localize("UTC") + ) pandas.testing.assert_frame_equal( got.reset_index(drop=True), # reset_index to ignore row labels