diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml
index ef3cb34f..c07f148f 100644
--- a/.github/.OwlBot.lock.yaml
+++ b/.github/.OwlBot.lock.yaml
@@ -1,3 +1,3 @@
docker:
image: gcr.io/repo-automation-bots/owlbot-python:latest
- digest: sha256:1456ea2b3b523ccff5e13030acef56d1de28f21249c62aa0f196265880338fa7
+ digest: sha256:0ffe3bdd6c7159692df5f7744da74e5ef19966288a6bf76023e8e04e0c424d7d
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 477b755b..4a0c8ade 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,13 @@
[1]: https://pypi.org/project/google-cloud-bigquery-storage/#history
+## [2.8.0](https://www.github.com/googleapis/python-bigquery-storage/compare/v2.7.0...v2.8.0) (2021-09-10)
+
+
+### Features
+
+* add `AppendRowsStream` helper to append rows with a `BigQueryWriteClient` ([#284](https://www.github.com/googleapis/python-bigquery-storage/issues/284)) ([2461f63](https://www.github.com/googleapis/python-bigquery-storage/commit/2461f63d37f707c2d634a95d87b8ffc3e4af3686))
+
## [2.7.0](https://www.github.com/googleapis/python-bigquery-storage/compare/v2.6.3...v2.7.0) (2021-09-02)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index f6de097e..b1fd7283 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -113,9 +113,9 @@ Coding Style
export GOOGLE_CLOUD_TESTING_BRANCH="main"
By doing this, you are specifying the location of the most up-to-date
- version of ``python-bigquery-storage``. The the suggested remote name ``upstream``
- should point to the official ``googleapis`` checkout and the
- the branch should be the main branch on that remote (``main``).
+ version of ``python-bigquery-storage``. The
+ remote name ``upstream`` should point to the official ``googleapis``
+ checkout and the branch should be the default branch on that remote (``main``).
- This repository contains configuration for the
`pre-commit `__ tool, which automates checking
diff --git a/README.rst b/README.rst
index 27d3f4fd..ecd5e44c 100644
--- a/README.rst
+++ b/README.rst
@@ -9,7 +9,7 @@ Python Client for BigQuery Storage API
- `Product Documentation`_
.. |ga| image:: https://img.shields.io/badge/support-GA-gold.svg
- :target: https://github.com/googleapis/google-cloud-python/blob/master/README.rst#general-availability
+ :target: https://github.com/googleapis/google-cloud-python/blob/main/README.rst#general-availability
.. |pypi| image:: https://img.shields.io/pypi/v/google-cloud-bigquery-storage.svg
:target: https://pypi.org/project/google-cloud-bigquery-storage/
.. |versions| image:: https://img.shields.io/pypi/pyversions/google-cloud-bigquery-storage.svg
@@ -113,4 +113,4 @@ Next Steps
APIs that we cover.
.. _BigQuery Storage API Product documentation: https://cloud.google.com/bigquery/docs/reference/storage/
-.. _repository’s main README: https://github.com/googleapis/google-cloud-python/blob/master/README.rst
+.. _repository’s main README: https://github.com/googleapis/google-cloud-python/blob/main/README.rst
diff --git a/docs/bigquery_storage_v1beta2/library.rst b/docs/bigquery_storage_v1beta2/library.rst
index 4e25d9d8..fd7dcb36 100644
--- a/docs/bigquery_storage_v1beta2/library.rst
+++ b/docs/bigquery_storage_v1beta2/library.rst
@@ -4,3 +4,7 @@ Bigquery Storage v1beta2 API Library
.. automodule:: google.cloud.bigquery_storage_v1beta2.client
:members:
:inherited-members:
+
+.. automodule:: google.cloud.bigquery_storage_v1beta2.writer
+ :members:
+ :inherited-members:
diff --git a/google/cloud/bigquery_storage_v1beta2/__init__.py b/google/cloud/bigquery_storage_v1beta2/__init__.py
index 6d0b34e1..8efde268 100644
--- a/google/cloud/bigquery_storage_v1beta2/__init__.py
+++ b/google/cloud/bigquery_storage_v1beta2/__init__.py
@@ -30,10 +30,15 @@ class BigQueryReadClient(client.BigQueryReadClient):
__doc__ = client.BigQueryReadClient.__doc__
+class BigQueryWriteClient(client.BigQueryWriteClient):
+ __doc__ = client.BigQueryWriteClient.__doc__
+
+
__all__ = (
# google.cloud.bigquery_storage_v1beta2
"__version__",
"types",
# google.cloud.bigquery_storage_v1beta2.client
"BigQueryReadClient",
+ "BigQueryWriteClient",
)
diff --git a/google/cloud/bigquery_storage_v1beta2/client.py b/google/cloud/bigquery_storage_v1beta2/client.py
index f2776a20..00bff3ff 100644
--- a/google/cloud/bigquery_storage_v1beta2/client.py
+++ b/google/cloud/bigquery_storage_v1beta2/client.py
@@ -19,12 +19,14 @@
This is the base from which all interactions with the API occur.
"""
-from __future__ import absolute_import
-
import google.api_core.gapic_v1.method
+import google.api_core.retry
from google.cloud.bigquery_storage_v1 import reader
-from google.cloud.bigquery_storage_v1beta2.services import big_query_read
+from google.cloud.bigquery_storage_v1beta2.services import (
+ big_query_read,
+ big_query_write,
+)
_SCOPES = (
@@ -135,3 +137,7 @@ def read_rows(
offset,
{"retry": retry, "timeout": timeout, "metadata": metadata},
)
+
+
+class BigQueryWriteClient(big_query_write.BigQueryWriteClient):
+ __doc__ = big_query_write.BigQueryWriteClient.__doc__
diff --git a/google/cloud/bigquery_storage_v1beta2/exceptions.py b/google/cloud/bigquery_storage_v1beta2/exceptions.py
new file mode 100644
index 00000000..0887071d
--- /dev/null
+++ b/google/cloud/bigquery_storage_v1beta2/exceptions.py
@@ -0,0 +1,17 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class StreamClosedError(Exception):
+ """Operation not supported while stream is closed."""
diff --git a/google/cloud/bigquery_storage_v1beta2/writer.py b/google/cloud/bigquery_storage_v1beta2/writer.py
new file mode 100644
index 00000000..3a77a07e
--- /dev/null
+++ b/google/cloud/bigquery_storage_v1beta2/writer.py
@@ -0,0 +1,412 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from __future__ import division
+
+import itertools
+import logging
+import queue
+import time
+import threading
+from typing import Callable, Optional, Sequence, Tuple
+
+from google.api_core import bidi
+from google.api_core.future import polling as polling_future
+from google.api_core import exceptions
+import google.api_core.retry
+import grpc
+
+from google.cloud.bigquery_storage_v1beta2 import exceptions as bqstorage_exceptions
+from google.cloud.bigquery_storage_v1beta2 import types as gapic_types
+from google.cloud.bigquery_storage_v1beta2.services import big_query_write
+
+_LOGGER = logging.getLogger(__name__)
+_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
+_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
+
+# open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before
+# checking again. This interval was chosen to result in about 3 loops.
+_WRITE_OPEN_INTERVAL = 0.08
+
+# Use a default timeout that is quite long to avoid potential infinite loops,
+# but still work for all expected requests
+_DEFAULT_TIMEOUT = 600
+
+
+def _wrap_as_exception(maybe_exception) -> Exception:
+ """Wrap an object as a Python exception, if needed.
+ Args:
+ maybe_exception (Any): The object to wrap, usually a gRPC exception class.
+ Returns:
+ The argument itself if an instance of ``BaseException``, otherwise
+ the argument represented as an instance of ``Exception`` (sub)class.
+ """
+ if isinstance(maybe_exception, grpc.RpcError):
+ return exceptions.from_grpc_error(maybe_exception)
+ elif isinstance(maybe_exception, BaseException):
+ return maybe_exception
+
+ return Exception(maybe_exception)
+
+
+class AppendRowsStream(object):
+ """A manager object which can append rows to a stream."""
+
+ def __init__(
+ self,
+ client: big_query_write.BigQueryWriteClient,
+ initial_request_template: gapic_types.AppendRowsRequest,
+ metadata: Sequence[Tuple[str, str]] = (),
+ ):
+ """Construct a stream manager.
+
+ Args:
+ client:
+ Client responsible for making requests.
+ initial_request_template:
+ Data to include in the first request sent to the stream. This
+ must contain
+ :attr:`google.cloud.bigquery_storage_v1beta2.types.AppendRowsRequest.write_stream`
+ and
+ :attr:`google.cloud.bigquery_storage_v1beta2.types.AppendRowsRequest.ProtoData.writer_schema`.
+ metadata:
+ Extra headers to include when sending the streaming request.
+ """
+ self._client = client
+ self._closing = threading.Lock()
+ self._closed = False
+ self._close_callbacks = []
+ self._futures_queue = queue.Queue()
+ self._inital_request_template = initial_request_template
+ self._metadata = metadata
+ self._rpc = None
+ self._stream_name = None
+
+ # The threads created in ``.open()``.
+ self._consumer = None
+
+ @property
+ def is_active(self) -> bool:
+ """bool: True if this manager is actively streaming.
+
+ Note that ``False`` does not indicate this is complete shut down,
+ just that it stopped getting new messages.
+ """
+ return self._consumer is not None and self._consumer.is_active
+
+ def add_close_callback(self, callback: Callable):
+ """Schedules a callable when the manager closes.
+ Args:
+ callback (Callable): The method to call.
+ """
+ self._close_callbacks.append(callback)
+
+ def open(
+ self,
+ initial_request: gapic_types.AppendRowsRequest,
+ timeout: float = _DEFAULT_TIMEOUT,
+ ) -> "AppendRowsFuture":
+ """Open an append rows stream.
+
+ This is automatically called by the first call to the
+ :attr:`google.cloud.bigquery_storage_v1beta2.writer.AppendRowsStream.send`
+ method.
+
+ Args:
+ initial_request:
+ The initial request to start the stream. Must have
+ :attr:`google.cloud.bigquery_storage_v1beta2.types.AppendRowsRequest.write_stream`
+ and ``proto_rows.writer_schema.proto_descriptor`` and
+ properties populated.
+ timeout:
+ How long (in seconds) to wait for the stream to be ready.
+
+ Returns:
+ A future, which can be used to process the response to the initial
+ request when it arrives.
+ """
+ if self.is_active:
+ raise ValueError("This manager is already open.")
+
+ if self._closed:
+ raise bqstorage_exceptions.StreamClosedError(
+ "This manager has been closed and can not be re-used."
+ )
+
+ start_time = time.monotonic()
+ request = gapic_types.AppendRowsRequest()
+ gapic_types.AppendRowsRequest.copy_from(request, self._inital_request_template)
+ request._pb.MergeFrom(initial_request._pb)
+ self._stream_name = request.write_stream
+
+ inital_response_future = AppendRowsFuture(self)
+ self._futures_queue.put(inital_response_future)
+
+ self._rpc = bidi.BidiRpc(
+ self._client.append_rows,
+ initial_request=request,
+ # TODO: pass in retry and timeout. Blocked by
+ # https://github.com/googleapis/python-api-core/issues/262
+ metadata=tuple(
+ itertools.chain(
+ self._metadata,
+ # This header is required so that the BigQuery Storage API
+ # knows which region to route the request to.
+ (("x-goog-request-params", f"write_stream={self._stream_name}"),),
+ )
+ ),
+ )
+ self._rpc.add_done_callback(self._on_rpc_done)
+
+ self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
+ self._consumer.start()
+
+ # Make sure RPC has started before returning.
+ # Without this, consumers may get:
+ #
+ # ValueError: Can not send() on an RPC that has never been open()ed.
+ #
+ # when they try to send a request.
+ while not self._rpc.is_active and self._consumer.is_active:
+ # Avoid 100% CPU while waiting for RPC to be ready.
+ time.sleep(_WRITE_OPEN_INTERVAL)
+
+ # TODO: Check retry.deadline instead of (per-request) timeout.
+ # Blocked by
+ # https://github.com/googleapis/python-api-core/issues/262
+ if timeout is None:
+ continue
+ current_time = time.monotonic()
+ if current_time - start_time > timeout:
+ break
+
+ # Something went wrong when opening the RPC.
+ if not self._consumer.is_active:
+ # TODO: Share the exception from _rpc.open(). Blocked by
+ # https://github.com/googleapis/python-api-core/issues/268
+ request_exception = exceptions.Unknown(
+ "There was a problem opening the stream. "
+ "Try turning on DEBUG level logs to see the error."
+ )
+ self.close(reason=request_exception)
+ raise request_exception
+
+ return inital_response_future
+
+ def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture":
+ """Send an append rows request to the open stream.
+
+ Args:
+ request:
+ The request to add to the stream.
+
+ Returns:
+ A future, which can be used to process the response when it
+ arrives.
+ """
+ if self._closed:
+ raise bqstorage_exceptions.StreamClosedError(
+ "This manager has been closed and can not be used."
+ )
+
+ # If the manager hasn't been openned yet, automatically open it.
+ if not self.is_active:
+ return self.open(request)
+
+ # For each request, we expect exactly one response (in order). Add a
+ # future to the queue so that when the response comes, the callback can
+ # pull it off and notify completion.
+ future = AppendRowsFuture(self)
+ self._futures_queue.put(future)
+ self._rpc.send(request)
+ return future
+
+ def _on_response(self, response: gapic_types.AppendRowsResponse):
+ """Process a response from a consumer callback."""
+ # If the stream has closed, but somehow we still got a response message
+ # back, discard it. The response futures queue has been drained, with
+ # an exception reported.
+ if self._closed:
+ raise bqstorage_exceptions.StreamClosedError(
+ f"Stream closed before receiving response: {response}"
+ )
+
+ # Since we have 1 response per request, if we get here from a response
+ # callback, the queue should never be empty.
+ future: AppendRowsFuture = self._futures_queue.get_nowait()
+ if response.error.code:
+ exc = exceptions.from_grpc_status(
+ response.error.code, response.error.message
+ )
+ future.set_exception(exc)
+ else:
+ future.set_result(response)
+
+ def close(self, reason: Optional[Exception] = None):
+ """Stop consuming messages and shutdown all helper threads.
+
+ This method is idempotent. Additional calls will have no effect.
+
+ The method does not block, it delegates the shutdown operations to a background
+ thread.
+
+ Args:
+ reason: The reason to close this. If ``None``, this is considered
+ an "intentional" shutdown. This is passed to the callbacks
+ specified via :meth:`add_close_callback`.
+ """
+ self._regular_shutdown_thread = threading.Thread(
+ name=_REGULAR_SHUTDOWN_THREAD_NAME,
+ daemon=True,
+ target=self._shutdown,
+ kwargs={"reason": reason},
+ )
+ self._regular_shutdown_thread.start()
+
+ def _shutdown(self, reason: Optional[Exception] = None):
+ """Run the actual shutdown sequence (stop the stream and all helper threads).
+
+ Args:
+ reason:
+ The reason to close the stream. If ``None``, this is
+ considered an "intentional" shutdown.
+ """
+ with self._closing:
+ if self._closed:
+ return
+
+ # Stop consuming messages.
+ if self.is_active:
+ _LOGGER.debug("Stopping consumer.")
+ self._consumer.stop()
+ self._consumer = None
+
+ self._rpc.close()
+ self._rpc = None
+ self._closed = True
+ _LOGGER.debug("Finished stopping manager.")
+
+ # We know that no new items will be added to the queue because
+ # we've marked the stream as closed.
+ while not self._futures_queue.empty():
+ # Mark each future as failed. Since the consumer thread has
+ # stopped (or at least is attempting to stop), we won't get
+ # response callbacks to populate the remaining futures.
+ future = self._futures_queue.get_nowait()
+ if reason is None:
+ exc = bqstorage_exceptions.StreamClosedError(
+ "Stream closed before receiving a response."
+ )
+ else:
+ exc = reason
+ future.set_exception(exc)
+
+ for callback in self._close_callbacks:
+ callback(self, reason)
+
+ def _on_rpc_done(self, future):
+ """Triggered whenever the underlying RPC terminates without recovery.
+
+ This is typically triggered from one of two threads: the background
+ consumer thread (when calling ``recv()`` produces a non-recoverable
+ error) or the grpc management thread (when cancelling the RPC).
+
+ This method is *non-blocking*. It will start another thread to deal
+ with shutting everything down. This is to prevent blocking in the
+ background consumer and preventing it from being ``joined()``.
+ """
+ _LOGGER.info("RPC termination has signaled streaming pull manager shutdown.")
+ error = _wrap_as_exception(future)
+ thread = threading.Thread(
+ name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error}
+ )
+ thread.daemon = True
+ thread.start()
+
+
+class AppendRowsFuture(polling_future.PollingFuture):
+ """Encapsulation of the asynchronous execution of an action.
+
+ This object is returned from long-running BigQuery Storage API calls, and
+ is the interface to determine the status of those calls.
+
+ This object should not be created directly, but is returned by other
+ methods in this library.
+ """
+
+ def __init__(self, manager: AppendRowsStream):
+ super().__init__()
+ self.__manager = manager
+ self.__cancelled = False
+ self._is_done = False
+
+ def cancel(self):
+ """Stops pulling messages and shutdowns the background thread consuming
+ messages.
+
+ The method does not block, it just triggers the shutdown and returns
+ immediately. To block until the background stream is terminated, call
+ :meth:`result()` after cancelling the future.
+ """
+ # NOTE: We circumvent the base future's self._state to track the cancellation
+ # state, as this state has different meaning with streaming pull futures.
+ # See: https://github.com/googleapis/python-pubsub/pull/397
+ self.__cancelled = True
+ return self.__manager.close()
+
+ def cancelled(self):
+ """
+ returns:
+ bool: ``True`` if the write stream has been cancelled.
+ """
+ return self.__cancelled
+
+ def done(self, retry: Optional[google.api_core.retry.Retry] = None) -> bool:
+ """Check the status of the future.
+
+ Args:
+ retry:
+ Not used. Included for compatibility with base clase. Future
+ status is updated by a background thread.
+
+ Returns:
+ ``True`` if the request has finished, otherwise ``False``.
+ """
+ # Consumer should call set_result or set_exception method, where this
+ # gets set to True *after* first setting _result.
+ #
+ # Consumer runs in a background thread, but this access is thread-safe:
+ # https://docs.python.org/3/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
+ return self._is_done
+
+ def set_exception(self, exception):
+ """Set the result of the future as being the given exception.
+
+ Do not use this method, it should only be used internally by the library and its
+ unit tests.
+ """
+ return_value = super().set_exception(exception=exception)
+ self._is_done = True
+ return return_value
+
+ def set_result(self, result):
+ """Set the return value of work associated with the future.
+
+ Do not use this method, it should only be used internally by the library and its
+ unit tests.
+ """
+ return_value = super().set_result(result=result)
+ self._is_done = True
+ return return_value
diff --git a/noxfile.py b/noxfile.py
index 3493eb27..98cd608b 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -94,7 +94,7 @@ def default(session):
constraints_path,
)
- session.install("-e", ".[fastavro,pandas,pyarrow]", "-c", constraints_path)
+ session.install("-e", ".[tests,fastavro,pandas,pyarrow]", "-c", constraints_path)
# Run py.test against the unit tests.
session.run(
diff --git a/owlbot.py b/owlbot.py
index b96b16a4..3d6771db 100644
--- a/owlbot.py
+++ b/owlbot.py
@@ -14,8 +14,6 @@
"""This script is used to synthesize generated parts of this library."""
-import re
-
import synthtool as s
from synthtool import gcp
from synthtool.languages import python
@@ -39,7 +37,7 @@
# wraps it.
s.replace(
library / "google/cloud/bigquery_storage/__init__.py",
- f"from google\.cloud\.bigquery_storage_{library.name}\.services.big_query_read.client import",
+ f"from google\\.cloud\\.bigquery_storage_{library.name}\\.services.big_query_read.client import",
f"from google.cloud.bigquery_storage_{library.name} import",
)
@@ -48,7 +46,7 @@
s.replace(
library / "google/cloud/bigquery_storage/__init__.py",
(
- f"from google\.cloud\.bigquery_storage_{library.name}\.services.big_query_read.async_client "
+ f"from google\\.cloud\\.bigquery_storage_{library.name}\\.services.big_query_read.async_client "
r"import BigQueryReadAsyncClient\n"
),
"",
@@ -63,17 +61,17 @@
# entry point.
s.replace(
library / "google/cloud/bigquery_storage/__init__.py",
- f"from google\.cloud\.bigquery_storage_{library.name}\.types\.arrow import ArrowRecordBatch",
+ f"from google\\.cloud\\.bigquery_storage_{library.name}\\.types\\.arrow import ArrowRecordBatch",
(
f"from google.cloud.bigquery_storage_{library.name} import types\n"
f"from google.cloud.bigquery_storage_{library.name} import __version__\n"
- "\g<0>"
+ "\\g<0>"
),
)
s.replace(
library / "google/cloud/bigquery_storage/__init__.py",
r"""["']ArrowRecordBatch["']""",
- ('"__version__",\n' ' "types",\n' " \g<0>"),
+ ('"__version__",\n' ' "types",\n' " \\g<0>"),
)
# We want to expose all types through "google.cloud.bigquery_storage.types",
@@ -89,12 +87,12 @@
s.replace(
library / f"google/cloud/bigquery_storage_{library.name}*/types/__init__.py",
r"from \.stream import \(",
- "\g<0>\n DataFormat,",
+ "\\g<0>\n DataFormat,",
)
s.replace(
library / f"google/cloud/bigquery_storage_{library.name}*/types/__init__.py",
r"""["']ReadSession["']""",
- '"DataFormat",\n \g<0>',
+ '"DataFormat",\n \\g<0>',
)
# The append_rows method doesn't contain keyword arguments that build request
@@ -130,11 +128,12 @@
# Add templated files
# ----------------------------------------------------------------------------
extras = ["fastavro", "pandas", "pyarrow"]
+unit_test_extras = ["tests"] + extras
templated_files = common.py_library(
microgenerator=True,
samples=True,
- unit_test_extras=extras,
+ unit_test_extras=unit_test_extras,
system_test_extras=extras,
system_test_external_dependencies=["google-cloud-bigquery"],
cov_level=95,
@@ -178,7 +177,7 @@
)
s.replace(
- "CONTRIBUTING.rst", "remote \(``master``\)", "remote (``main``)",
+ "CONTRIBUTING.rst", "remote \\(``master``\\)", "remote (``main``)",
)
s.replace(
diff --git a/samples/quickstart/requirements.txt b/samples/quickstart/requirements.txt
index a4b9ec9e..37ab4795 100644
--- a/samples/quickstart/requirements.txt
+++ b/samples/quickstart/requirements.txt
@@ -1,2 +1,2 @@
fastavro
-google-cloud-bigquery-storage==2.6.3
+google-cloud-bigquery-storage==2.7.0
diff --git a/samples/snippets/__init__.py b/samples/snippets/__init__.py
new file mode 100644
index 00000000..0098709d
--- /dev/null
+++ b/samples/snippets/__init__.py
@@ -0,0 +1,15 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/samples/snippets/append_rows_proto2.py b/samples/snippets/append_rows_proto2.py
new file mode 100644
index 00000000..d0390993
--- /dev/null
+++ b/samples/snippets/append_rows_proto2.py
@@ -0,0 +1,249 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# [START bigquerystorage_append_rows_raw_proto2]
+"""
+This code sample demonstrates using the low-level generated client for Python.
+"""
+
+import datetime
+import decimal
+
+from google.cloud import bigquery_storage_v1beta2
+from google.cloud.bigquery_storage_v1beta2 import types
+from google.cloud.bigquery_storage_v1beta2 import writer
+from google.protobuf import descriptor_pb2
+
+# If you make updates to the sample_data.proto protocol buffers definition,
+# run:
+#
+# protoc --python_out=. sample_data.proto
+#
+# from the samples/snippets directory to generate the sample_data_pb2 module.
+from . import sample_data_pb2
+
+
+def append_rows_proto2(project_id: str, dataset_id: str, table_id: str):
+ """Create a write stream, write some sample data, and commit the stream."""
+ write_client = bigquery_storage_v1beta2.BigQueryWriteClient()
+ parent = write_client.table_path(project_id, dataset_id, table_id)
+ write_stream = types.WriteStream()
+
+ # When creating the stream, choose the type. Use the PENDING type to wait
+ # until the stream is committed before it is visible. See:
+ # https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta2#google.cloud.bigquery.storage.v1beta2.WriteStream.Type
+ write_stream.type_ = types.WriteStream.Type.PENDING
+ write_stream = write_client.create_write_stream(
+ parent=parent, write_stream=write_stream
+ )
+ stream_name = write_stream.name
+
+ # Create a template with fields needed for the first request.
+ request_template = types.AppendRowsRequest()
+
+ # The initial request must contain the stream name.
+ request_template.write_stream = stream_name
+
+ # So that BigQuery knows how to parse the serialized_rows, generate a
+ # protocol buffer representation of your message descriptor.
+ proto_schema = types.ProtoSchema()
+ proto_descriptor = descriptor_pb2.DescriptorProto()
+ sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
+ proto_schema.proto_descriptor = proto_descriptor
+ proto_data = types.AppendRowsRequest.ProtoData()
+ proto_data.writer_schema = proto_schema
+ request_template.proto_rows = proto_data
+
+ # Some stream types support an unbounded number of requests. Construct an
+ # AppendRowsStream to send an arbitrary number of requests to a stream.
+ append_rows_stream = writer.AppendRowsStream(write_client, request_template)
+
+ # Create a batch of row data by appending proto2 serialized bytes to the
+ # serialized_rows repeated field.
+ proto_rows = types.ProtoRows()
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 1
+ row.bool_col = True
+ row.bytes_col = b"Hello, World!"
+ row.float64_col = float("+inf")
+ row.int64_col = 123
+ row.string_col = "Howdy!"
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 2
+ row.bool_col = False
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 3
+ row.bytes_col = b"See you later!"
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 4
+ row.float64_col = 1000000.125
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 5
+ row.int64_col = 67000
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 6
+ row.string_col = "Auf Wiedersehen!"
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ # Set an offset to allow resuming this stream if the connection breaks.
+ # Keep track of which requests the server has acknowledged and resume the
+ # stream at the first non-acknowledged message. If the server has already
+ # processed a message with that offset, it will return an ALREADY_EXISTS
+ # error, which can be safely ignored.
+ #
+ # The first request must always have an offset of 0.
+ request = types.AppendRowsRequest()
+ request.offset = 0
+ proto_data = types.AppendRowsRequest.ProtoData()
+ proto_data.rows = proto_rows
+ request.proto_rows = proto_data
+
+ response_future_1 = append_rows_stream.send(request)
+
+ # Create a batch of rows containing scalar values that don't directly
+ # correspond to a protocol buffers scalar type. See the documentation for
+ # the expected data formats:
+ # https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
+ proto_rows = types.ProtoRows()
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 7
+ date_value = datetime.date(2021, 8, 12)
+ epoch_value = datetime.date(1970, 1, 1)
+ delta = date_value - epoch_value
+ row.date_col = delta.days
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 8
+ datetime_value = datetime.datetime(2021, 8, 12, 9, 46, 23, 987456)
+ row.datetime_col = datetime_value.strftime("%Y-%m-%d %H:%M:%S.%f")
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 9
+ row.geography_col = "POINT(-122.347222 47.651111)"
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 10
+ numeric_value = decimal.Decimal("1.23456789101112e+6")
+ row.numeric_col = str(numeric_value)
+ bignumeric_value = decimal.Decimal("-1.234567891011121314151617181920e+16")
+ row.bignumeric_col = str(bignumeric_value)
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 11
+ time_value = datetime.time(11, 7, 48, 123456)
+ row.time_col = time_value.strftime("%H:%M:%S.%f")
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 12
+ timestamp_value = datetime.datetime(
+ 2021, 8, 12, 16, 11, 22, 987654, tzinfo=datetime.timezone.utc
+ )
+ epoch_value = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc)
+ delta = timestamp_value - epoch_value
+ row.timestamp_col = int(delta.total_seconds()) * 1000000 + int(delta.microseconds)
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ # Since this is the second request, you only need to include the row data.
+ # The name of the stream and protocol buffers DESCRIPTOR is only needed in
+ # the first request.
+ request = types.AppendRowsRequest()
+ proto_data = types.AppendRowsRequest.ProtoData()
+ proto_data.rows = proto_rows
+ request.proto_rows = proto_data
+
+ # Offset must equal the number of rows that were previously sent.
+ request.offset = 6
+
+ response_future_2 = append_rows_stream.send(request)
+
+ # Create a batch of rows with STRUCT and ARRAY BigQuery data types. In
+ # protocol buffers, these correspond to nested messages and repeated
+ # fields, respectively.
+ proto_rows = types.ProtoRows()
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 13
+ row.int64_list.append(1)
+ row.int64_list.append(2)
+ row.int64_list.append(3)
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 14
+ row.struct_col.sub_int_col = 7
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ row = sample_data_pb2.SampleData()
+ row.row_num = 15
+ sub_message = sample_data_pb2.SampleData.SampleStruct()
+ sub_message.sub_int_col = -1
+ row.struct_list.append(sub_message)
+ sub_message = sample_data_pb2.SampleData.SampleStruct()
+ sub_message.sub_int_col = -2
+ row.struct_list.append(sub_message)
+ sub_message = sample_data_pb2.SampleData.SampleStruct()
+ sub_message.sub_int_col = -3
+ row.struct_list.append(sub_message)
+ proto_rows.serialized_rows.append(row.SerializeToString())
+
+ request = types.AppendRowsRequest()
+ request.offset = 12
+ proto_data = types.AppendRowsRequest.ProtoData()
+ proto_data.rows = proto_rows
+ request.proto_rows = proto_data
+
+ # For each request sent, a message is expected in the responses iterable.
+ # This sample sends 3 requests, therefore expect exactly 3 responses.
+ response_future_3 = append_rows_stream.send(request)
+
+ # All three requests are in-flight, wait for them to finish being processed
+ # before finalizing the stream.
+ print(response_future_1.result())
+ print(response_future_2.result())
+ print(response_future_3.result())
+
+ # Shutdown background threads and close the streaming connection.
+ append_rows_stream.close()
+
+ # A PENDING type stream must be "finalized" before being committed. No new
+ # records can be written to the stream after this method has been called.
+ write_client.finalize_write_stream(name=write_stream.name)
+
+ # Commit the stream you created earlier.
+ batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest()
+ batch_commit_write_streams_request.parent = parent
+ batch_commit_write_streams_request.write_streams = [write_stream.name]
+ write_client.batch_commit_write_streams(batch_commit_write_streams_request)
+
+ print(f"Writes to stream: '{write_stream.name}' have been committed.")
+
+
+# [END bigquerystorage_append_rows_raw_proto2]
diff --git a/samples/snippets/append_rows_proto2_test.py b/samples/snippets/append_rows_proto2_test.py
new file mode 100644
index 00000000..dddda301
--- /dev/null
+++ b/samples/snippets/append_rows_proto2_test.py
@@ -0,0 +1,126 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import decimal
+import pathlib
+import random
+
+from google.cloud import bigquery
+import pytest
+
+from . import append_rows_proto2
+
+
+DIR = pathlib.Path(__file__).parent
+
+
+regions = ["US", "non-US"]
+
+
+@pytest.fixture(params=regions)
+def sample_data_table(
+ request: pytest.FixtureRequest,
+ bigquery_client: bigquery.Client,
+ project_id: str,
+ dataset_id: str,
+ dataset_id_non_us: str,
+) -> str:
+ dataset = dataset_id
+ if request.param != "US":
+ dataset = dataset_id_non_us
+ schema = bigquery_client.schema_from_json(str(DIR / "sample_data_schema.json"))
+ table_id = f"append_rows_proto2_{random.randrange(10000)}"
+ full_table_id = f"{project_id}.{dataset}.{table_id}"
+ table = bigquery.Table(full_table_id, schema=schema)
+ table = bigquery_client.create_table(table, exists_ok=True)
+ yield full_table_id
+ bigquery_client.delete_table(table, not_found_ok=True)
+
+
+def test_append_rows_proto2(
+ capsys: pytest.CaptureFixture,
+ bigquery_client: bigquery.Client,
+ sample_data_table: str,
+):
+ project_id, dataset_id, table_id = sample_data_table.split(".")
+ append_rows_proto2.append_rows_proto2(
+ project_id=project_id, dataset_id=dataset_id, table_id=table_id
+ )
+ out, _ = capsys.readouterr()
+ assert "have been committed" in out
+
+ rows = bigquery_client.query(
+ f"SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"
+ ).result()
+ row_items = [
+ # Convert to sorted tuple of items, omitting NULL values, to make
+ # searching for expected rows easier.
+ tuple(
+ sorted(
+ item for item in row.items() if item[1] is not None and item[1] != []
+ )
+ )
+ for row in rows
+ ]
+
+ assert (
+ ("bool_col", True),
+ ("bytes_col", b"Hello, World!"),
+ ("float64_col", float("+inf")),
+ ("int64_col", 123),
+ ("row_num", 1),
+ ("string_col", "Howdy!"),
+ ) in row_items
+ assert (("bool_col", False), ("row_num", 2)) in row_items
+ assert (("bytes_col", b"See you later!"), ("row_num", 3)) in row_items
+ assert (("float64_col", 1000000.125), ("row_num", 4)) in row_items
+ assert (("int64_col", 67000), ("row_num", 5)) in row_items
+ assert (("row_num", 6), ("string_col", "Auf Wiedersehen!")) in row_items
+ assert (("date_col", datetime.date(2021, 8, 12)), ("row_num", 7)) in row_items
+ assert (
+ ("datetime_col", datetime.datetime(2021, 8, 12, 9, 46, 23, 987456)),
+ ("row_num", 8),
+ ) in row_items
+ assert (
+ ("geography_col", "POINT(-122.347222 47.651111)"),
+ ("row_num", 9),
+ ) in row_items
+ assert (
+ ("bignumeric_col", decimal.Decimal("-1.234567891011121314151617181920e+16")),
+ ("numeric_col", decimal.Decimal("1.23456789101112e+6")),
+ ("row_num", 10),
+ ) in row_items
+ assert (
+ ("row_num", 11),
+ ("time_col", datetime.time(11, 7, 48, 123456)),
+ ) in row_items
+ assert (
+ ("row_num", 12),
+ (
+ "timestamp_col",
+ datetime.datetime(
+ 2021, 8, 12, 16, 11, 22, 987654, tzinfo=datetime.timezone.utc
+ ),
+ ),
+ ) in row_items
+ assert (("int64_list", [1, 2, 3]), ("row_num", 13)) in row_items
+ assert (("row_num", 14), ("struct_col", {"sub_int_col": 7}),) in row_items
+ assert (
+ ("row_num", 15),
+ (
+ "struct_list",
+ [{"sub_int_col": -1}, {"sub_int_col": -2}, {"sub_int_col": -3}],
+ ),
+ ) in row_items
diff --git a/samples/snippets/conftest.py b/samples/snippets/conftest.py
new file mode 100644
index 00000000..531f0b9d
--- /dev/null
+++ b/samples/snippets/conftest.py
@@ -0,0 +1,60 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from google.cloud import bigquery
+import pytest
+import test_utils.prefixer
+
+
+prefixer = test_utils.prefixer.Prefixer("python-bigquery-storage", "samples/snippets")
+
+
+@pytest.fixture(scope="session", autouse=True)
+def cleanup_datasets(bigquery_client: bigquery.Client):
+ for dataset in bigquery_client.list_datasets():
+ if prefixer.should_cleanup(dataset.dataset_id):
+ bigquery_client.delete_dataset(
+ dataset, delete_contents=True, not_found_ok=True
+ )
+
+
+@pytest.fixture(scope="session")
+def bigquery_client():
+ return bigquery.Client()
+
+
+@pytest.fixture(scope="session")
+def project_id(bigquery_client):
+ return bigquery_client.project
+
+
+@pytest.fixture(scope="session")
+def dataset_id(bigquery_client: bigquery.Client, project_id: str):
+ dataset_id = prefixer.create_prefix()
+ full_dataset_id = f"{project_id}.{dataset_id}"
+ dataset = bigquery.Dataset(full_dataset_id)
+ bigquery_client.create_dataset(dataset)
+ yield dataset_id
+ bigquery_client.delete_dataset(dataset, delete_contents=True, not_found_ok=True)
+
+
+@pytest.fixture(scope="session")
+def dataset_id_non_us(bigquery_client: bigquery.Client, project_id: str):
+ dataset_id = prefixer.create_prefix()
+ full_dataset_id = f"{project_id}.{dataset_id}"
+ dataset = bigquery.Dataset(full_dataset_id)
+ dataset.location = "asia-northeast1"
+ bigquery_client.create_dataset(dataset)
+ yield dataset_id
+ bigquery_client.delete_dataset(dataset, delete_contents=True, not_found_ok=True)
diff --git a/samples/snippets/noxfile.py b/samples/snippets/noxfile.py
new file mode 100644
index 00000000..b008613f
--- /dev/null
+++ b/samples/snippets/noxfile.py
@@ -0,0 +1,266 @@
+# Copyright 2019 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+
+import os
+from pathlib import Path
+import sys
+from typing import Callable, Dict, List, Optional
+
+import nox
+
+
+# WARNING - WARNING - WARNING - WARNING - WARNING
+# WARNING - WARNING - WARNING - WARNING - WARNING
+# DO NOT EDIT THIS FILE EVER!
+# WARNING - WARNING - WARNING - WARNING - WARNING
+# WARNING - WARNING - WARNING - WARNING - WARNING
+
+BLACK_VERSION = "black==19.10b0"
+
+# Copy `noxfile_config.py` to your directory and modify it instead.
+
+# `TEST_CONFIG` dict is a configuration hook that allows users to
+# modify the test configurations. The values here should be in sync
+# with `noxfile_config.py`. Users will copy `noxfile_config.py` into
+# their directory and modify it.
+
+TEST_CONFIG = {
+ # You can opt out from the test for specific Python versions.
+ "ignored_versions": [],
+ # Old samples are opted out of enforcing Python type hints
+ # All new samples should feature them
+ "enforce_type_hints": False,
+ # An envvar key for determining the project id to use. Change it
+ # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
+ # build specific Cloud project. You can also use your own string
+ # to use your own Cloud project.
+ "gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
+ # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',
+ # If you need to use a specific version of pip,
+ # change pip_version_override to the string representation
+ # of the version number, for example, "20.2.4"
+ "pip_version_override": None,
+ # A dictionary you want to inject into your test. Don't put any
+ # secrets here. These values will override predefined values.
+ "envs": {},
+}
+
+
+try:
+ # Ensure we can import noxfile_config in the project's directory.
+ sys.path.append(".")
+ from noxfile_config import TEST_CONFIG_OVERRIDE
+except ImportError as e:
+ print("No user noxfile_config found: detail: {}".format(e))
+ TEST_CONFIG_OVERRIDE = {}
+
+# Update the TEST_CONFIG with the user supplied values.
+TEST_CONFIG.update(TEST_CONFIG_OVERRIDE)
+
+
+def get_pytest_env_vars() -> Dict[str, str]:
+ """Returns a dict for pytest invocation."""
+ ret = {}
+
+ # Override the GCLOUD_PROJECT and the alias.
+ env_key = TEST_CONFIG["gcloud_project_env"]
+ # This should error out if not set.
+ ret["GOOGLE_CLOUD_PROJECT"] = os.environ[env_key]
+
+ # Apply user supplied envs.
+ ret.update(TEST_CONFIG["envs"])
+ return ret
+
+
+# DO NOT EDIT - automatically generated.
+# All versions used to test samples.
+ALL_VERSIONS = ["3.6", "3.7", "3.8", "3.9"]
+
+# Any default versions that should be ignored.
+IGNORED_VERSIONS = TEST_CONFIG["ignored_versions"]
+
+TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS])
+
+INSTALL_LIBRARY_FROM_SOURCE = os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False) in (
+ "True",
+ "true",
+)
+#
+# Style Checks
+#
+
+
+def _determine_local_import_names(start_dir: str) -> List[str]:
+ """Determines all import names that should be considered "local".
+
+ This is used when running the linter to insure that import order is
+ properly checked.
+ """
+ file_ext_pairs = [os.path.splitext(path) for path in os.listdir(start_dir)]
+ return [
+ basename
+ for basename, extension in file_ext_pairs
+ if extension == ".py"
+ or os.path.isdir(os.path.join(start_dir, basename))
+ and basename not in ("__pycache__")
+ ]
+
+
+# Linting with flake8.
+#
+# We ignore the following rules:
+# E203: whitespace before ‘:’
+# E266: too many leading ‘#’ for block comment
+# E501: line too long
+# I202: Additional newline in a section of imports
+#
+# We also need to specify the rules which are ignored by default:
+# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121']
+FLAKE8_COMMON_ARGS = [
+ "--show-source",
+ "--builtin=gettext",
+ "--max-complexity=20",
+ "--import-order-style=google",
+ "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py",
+ "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202",
+ "--max-line-length=88",
+]
+
+
+@nox.session
+def lint(session: nox.sessions.Session) -> None:
+ if not TEST_CONFIG["enforce_type_hints"]:
+ session.install("flake8", "flake8-import-order")
+ else:
+ session.install("flake8", "flake8-import-order", "flake8-annotations")
+
+ local_names = _determine_local_import_names(".")
+ args = FLAKE8_COMMON_ARGS + [
+ "--application-import-names",
+ ",".join(local_names),
+ ".",
+ ]
+ session.run("flake8", *args)
+
+
+#
+# Black
+#
+
+
+@nox.session
+def blacken(session: nox.sessions.Session) -> None:
+ session.install(BLACK_VERSION)
+ python_files = [path for path in os.listdir(".") if path.endswith(".py")]
+
+ session.run("black", *python_files)
+
+
+#
+# Sample Tests
+#
+
+
+PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"]
+
+
+def _session_tests(
+ session: nox.sessions.Session, post_install: Callable = None
+) -> None:
+ if TEST_CONFIG["pip_version_override"]:
+ pip_version = TEST_CONFIG["pip_version_override"]
+ session.install(f"pip=={pip_version}")
+ """Runs py.test for a particular project."""
+ if os.path.exists("requirements.txt"):
+ if os.path.exists("constraints.txt"):
+ session.install("-r", "requirements.txt", "-c", "constraints.txt")
+ else:
+ session.install("-r", "requirements.txt")
+
+ if os.path.exists("requirements-test.txt"):
+ if os.path.exists("constraints-test.txt"):
+ session.install("-r", "requirements-test.txt", "-c", "constraints-test.txt")
+ else:
+ session.install("-r", "requirements-test.txt")
+
+ if INSTALL_LIBRARY_FROM_SOURCE:
+ session.install("-e", _get_repo_root())
+
+ if post_install:
+ post_install(session)
+
+ session.run(
+ "pytest",
+ *(PYTEST_COMMON_ARGS + session.posargs),
+ # Pytest will return 5 when no tests are collected. This can happen
+ # on travis where slow and flaky tests are excluded.
+ # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html
+ success_codes=[0, 5],
+ env=get_pytest_env_vars(),
+ )
+
+
+@nox.session(python=ALL_VERSIONS)
+def py(session: nox.sessions.Session) -> None:
+ """Runs py.test for a sample using the specified version of Python."""
+ if session.python in TESTED_VERSIONS:
+ _session_tests(session)
+ else:
+ session.skip(
+ "SKIPPED: {} tests are disabled for this sample.".format(session.python)
+ )
+
+
+#
+# Readmegen
+#
+
+
+def _get_repo_root() -> Optional[str]:
+ """ Returns the root folder of the project. """
+ # Get root of this repository. Assume we don't have directories nested deeper than 10 items.
+ p = Path(os.getcwd())
+ for i in range(10):
+ if p is None:
+ break
+ if Path(p / ".git").exists():
+ return str(p)
+ # .git is not available in repos cloned via Cloud Build
+ # setup.py is always in the library's root, so use that instead
+ # https://github.com/googleapis/synthtool/issues/792
+ if Path(p / "setup.py").exists():
+ return str(p)
+ p = p.parent
+ raise Exception("Unable to detect repository root.")
+
+
+GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")])
+
+
+@nox.session
+@nox.parametrize("path", GENERATED_READMES)
+def readmegen(session: nox.sessions.Session, path: str) -> None:
+ """(Re-)generates the readme for a sample."""
+ session.install("jinja2", "pyyaml")
+ dir_ = os.path.dirname(path)
+
+ if os.path.exists(os.path.join(dir_, "requirements.txt")):
+ session.install("-r", os.path.join(dir_, "requirements.txt"))
+
+ in_file = os.path.join(dir_, "README.rst.in")
+ session.run(
+ "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file
+ )
diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt
new file mode 100644
index 00000000..85597665
--- /dev/null
+++ b/samples/snippets/requirements-test.txt
@@ -0,0 +1,2 @@
+google-cloud-testutils==1.0.0
+pytest==6.2.4
diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt
new file mode 100644
index 00000000..8df538bb
--- /dev/null
+++ b/samples/snippets/requirements.txt
@@ -0,0 +1,3 @@
+google-cloud-bigquery-storage==2.6.2
+google-cloud-bigquery==2.24.1
+protobuf==3.17.3
diff --git a/samples/snippets/sample_data.proto b/samples/snippets/sample_data.proto
new file mode 100644
index 00000000..3e9f19ce
--- /dev/null
+++ b/samples/snippets/sample_data.proto
@@ -0,0 +1,61 @@
+// Copyright 2021 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// [START bigquerystorage_append_rows_raw_proto2_definition]
+// The BigQuery Storage API expects protocol buffer data to be encoded in the
+// proto2 wire format. This allows it to disambiguate missing optional fields
+// from default values without the need for wrapper types.
+syntax = "proto2";
+
+// Define a message type representing the rows in your table. The message
+// cannot contain fields which are not present in the table.
+message SampleData {
+ // Use a nested message to encode STRUCT column values.
+ //
+ // References to external messages are not allowed. Any message definitions
+ // must be nested within the root message representing row data.
+ message SampleStruct {
+ optional int64 sub_int_col = 1;
+ }
+
+ // The following types map directly between protocol buffers and their
+ // corresponding BigQuery data types.
+ optional bool bool_col = 1;
+ optional bytes bytes_col = 2;
+ optional double float64_col = 3;
+ optional int64 int64_col = 4;
+ optional string string_col = 5;
+
+ // The following data types require some encoding to use. See the
+ // documentation for the expected data formats:
+ // https://cloud.google.com/bigquery/docs/write-api#data_type_conversion
+ optional int32 date_col = 6;
+ optional string datetime_col = 7;
+ optional string geography_col = 8;
+ optional string numeric_col = 9;
+ optional string bignumeric_col = 10;
+ optional string time_col = 11;
+ optional int64 timestamp_col = 12;
+
+ // Use a repeated field to represent a BigQuery ARRAY value.
+ repeated int64 int64_list = 13;
+
+ // Use a nested message to encode STRUCT and ARRAY values.
+ optional SampleStruct struct_col = 14;
+ repeated SampleStruct struct_list = 15;
+
+ // Use the required keyword for client-side validation of required fields.
+ required int64 row_num = 16;
+}
+// [END bigquerystorage_append_rows_raw_proto2_definition]
diff --git a/samples/snippets/sample_data_pb2.py b/samples/snippets/sample_data_pb2.py
new file mode 100644
index 00000000..ba524988
--- /dev/null
+++ b/samples/snippets/sample_data_pb2.py
@@ -0,0 +1,418 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: sample_data.proto
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+ name="sample_data.proto",
+ package="",
+ syntax="proto2",
+ serialized_options=None,
+ create_key=_descriptor._internal_create_key,
+ serialized_pb=b'\n\x11sample_data.proto"\xa9\x03\n\nSampleData\x12\x10\n\x08\x62ool_col\x18\x01 \x01(\x08\x12\x11\n\tbytes_col\x18\x02 \x01(\x0c\x12\x13\n\x0b\x66loat64_col\x18\x03 \x01(\x01\x12\x11\n\tint64_col\x18\x04 \x01(\x03\x12\x12\n\nstring_col\x18\x05 \x01(\t\x12\x10\n\x08\x64\x61te_col\x18\x06 \x01(\x05\x12\x14\n\x0c\x64\x61tetime_col\x18\x07 \x01(\t\x12\x15\n\rgeography_col\x18\x08 \x01(\t\x12\x13\n\x0bnumeric_col\x18\t \x01(\t\x12\x16\n\x0e\x62ignumeric_col\x18\n \x01(\t\x12\x10\n\x08time_col\x18\x0b \x01(\t\x12\x15\n\rtimestamp_col\x18\x0c \x01(\x03\x12\x12\n\nint64_list\x18\r \x03(\x03\x12,\n\nstruct_col\x18\x0e \x01(\x0b\x32\x18.SampleData.SampleStruct\x12-\n\x0bstruct_list\x18\x0f \x03(\x0b\x32\x18.SampleData.SampleStruct\x12\x0f\n\x07row_num\x18\x10 \x02(\x03\x1a#\n\x0cSampleStruct\x12\x13\n\x0bsub_int_col\x18\x01 \x01(\x03',
+)
+
+
+_SAMPLEDATA_SAMPLESTRUCT = _descriptor.Descriptor(
+ name="SampleStruct",
+ full_name="SampleData.SampleStruct",
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name="sub_int_col",
+ full_name="SampleData.SampleStruct.sub_int_col",
+ index=0,
+ number=1,
+ type=3,
+ cpp_type=2,
+ label=1,
+ has_default_value=False,
+ default_value=0,
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ ],
+ extensions=[],
+ nested_types=[],
+ enum_types=[],
+ serialized_options=None,
+ is_extendable=False,
+ syntax="proto2",
+ extension_ranges=[],
+ oneofs=[],
+ serialized_start=412,
+ serialized_end=447,
+)
+
+_SAMPLEDATA = _descriptor.Descriptor(
+ name="SampleData",
+ full_name="SampleData",
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ create_key=_descriptor._internal_create_key,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name="bool_col",
+ full_name="SampleData.bool_col",
+ index=0,
+ number=1,
+ type=8,
+ cpp_type=7,
+ label=1,
+ has_default_value=False,
+ default_value=False,
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="bytes_col",
+ full_name="SampleData.bytes_col",
+ index=1,
+ number=2,
+ type=12,
+ cpp_type=9,
+ label=1,
+ has_default_value=False,
+ default_value=b"",
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="float64_col",
+ full_name="SampleData.float64_col",
+ index=2,
+ number=3,
+ type=1,
+ cpp_type=5,
+ label=1,
+ has_default_value=False,
+ default_value=float(0),
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="int64_col",
+ full_name="SampleData.int64_col",
+ index=3,
+ number=4,
+ type=3,
+ cpp_type=2,
+ label=1,
+ has_default_value=False,
+ default_value=0,
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="string_col",
+ full_name="SampleData.string_col",
+ index=4,
+ number=5,
+ type=9,
+ cpp_type=9,
+ label=1,
+ has_default_value=False,
+ default_value=b"".decode("utf-8"),
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="date_col",
+ full_name="SampleData.date_col",
+ index=5,
+ number=6,
+ type=5,
+ cpp_type=1,
+ label=1,
+ has_default_value=False,
+ default_value=0,
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="datetime_col",
+ full_name="SampleData.datetime_col",
+ index=6,
+ number=7,
+ type=9,
+ cpp_type=9,
+ label=1,
+ has_default_value=False,
+ default_value=b"".decode("utf-8"),
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="geography_col",
+ full_name="SampleData.geography_col",
+ index=7,
+ number=8,
+ type=9,
+ cpp_type=9,
+ label=1,
+ has_default_value=False,
+ default_value=b"".decode("utf-8"),
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="numeric_col",
+ full_name="SampleData.numeric_col",
+ index=8,
+ number=9,
+ type=9,
+ cpp_type=9,
+ label=1,
+ has_default_value=False,
+ default_value=b"".decode("utf-8"),
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="bignumeric_col",
+ full_name="SampleData.bignumeric_col",
+ index=9,
+ number=10,
+ type=9,
+ cpp_type=9,
+ label=1,
+ has_default_value=False,
+ default_value=b"".decode("utf-8"),
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="time_col",
+ full_name="SampleData.time_col",
+ index=10,
+ number=11,
+ type=9,
+ cpp_type=9,
+ label=1,
+ has_default_value=False,
+ default_value=b"".decode("utf-8"),
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="timestamp_col",
+ full_name="SampleData.timestamp_col",
+ index=11,
+ number=12,
+ type=3,
+ cpp_type=2,
+ label=1,
+ has_default_value=False,
+ default_value=0,
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="int64_list",
+ full_name="SampleData.int64_list",
+ index=12,
+ number=13,
+ type=3,
+ cpp_type=2,
+ label=3,
+ has_default_value=False,
+ default_value=[],
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="struct_col",
+ full_name="SampleData.struct_col",
+ index=13,
+ number=14,
+ type=11,
+ cpp_type=10,
+ label=1,
+ has_default_value=False,
+ default_value=None,
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="struct_list",
+ full_name="SampleData.struct_list",
+ index=14,
+ number=15,
+ type=11,
+ cpp_type=10,
+ label=3,
+ has_default_value=False,
+ default_value=[],
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ _descriptor.FieldDescriptor(
+ name="row_num",
+ full_name="SampleData.row_num",
+ index=15,
+ number=16,
+ type=3,
+ cpp_type=2,
+ label=2,
+ has_default_value=False,
+ default_value=0,
+ message_type=None,
+ enum_type=None,
+ containing_type=None,
+ is_extension=False,
+ extension_scope=None,
+ serialized_options=None,
+ file=DESCRIPTOR,
+ create_key=_descriptor._internal_create_key,
+ ),
+ ],
+ extensions=[],
+ nested_types=[_SAMPLEDATA_SAMPLESTRUCT,],
+ enum_types=[],
+ serialized_options=None,
+ is_extendable=False,
+ syntax="proto2",
+ extension_ranges=[],
+ oneofs=[],
+ serialized_start=22,
+ serialized_end=447,
+)
+
+_SAMPLEDATA_SAMPLESTRUCT.containing_type = _SAMPLEDATA
+_SAMPLEDATA.fields_by_name["struct_col"].message_type = _SAMPLEDATA_SAMPLESTRUCT
+_SAMPLEDATA.fields_by_name["struct_list"].message_type = _SAMPLEDATA_SAMPLESTRUCT
+DESCRIPTOR.message_types_by_name["SampleData"] = _SAMPLEDATA
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+SampleData = _reflection.GeneratedProtocolMessageType(
+ "SampleData",
+ (_message.Message,),
+ {
+ "SampleStruct": _reflection.GeneratedProtocolMessageType(
+ "SampleStruct",
+ (_message.Message,),
+ {
+ "DESCRIPTOR": _SAMPLEDATA_SAMPLESTRUCT,
+ "__module__": "sample_data_pb2"
+ # @@protoc_insertion_point(class_scope:SampleData.SampleStruct)
+ },
+ ),
+ "DESCRIPTOR": _SAMPLEDATA,
+ "__module__": "sample_data_pb2"
+ # @@protoc_insertion_point(class_scope:SampleData)
+ },
+)
+_sym_db.RegisterMessage(SampleData)
+_sym_db.RegisterMessage(SampleData.SampleStruct)
+
+
+# @@protoc_insertion_point(module_scope)
diff --git a/samples/snippets/sample_data_schema.json b/samples/snippets/sample_data_schema.json
new file mode 100644
index 00000000..ba6ba102
--- /dev/null
+++ b/samples/snippets/sample_data_schema.json
@@ -0,0 +1,76 @@
+
+[
+ {
+ "name": "bool_col",
+ "type": "BOOLEAN"
+ },
+ {
+ "name": "bytes_col",
+ "type": "BYTES"
+ },
+ {
+ "name": "date_col",
+ "type": "DATE"
+ },
+ {
+ "name": "datetime_col",
+ "type": "DATETIME"
+ },
+ {
+ "name": "float64_col",
+ "type": "FLOAT"
+ },
+ {
+ "name": "geography_col",
+ "type": "GEOGRAPHY"
+ },
+ {
+ "name": "int64_col",
+ "type": "INTEGER"
+ },
+ {
+ "name": "numeric_col",
+ "type": "NUMERIC"
+ },
+ {
+ "name": "bignumeric_col",
+ "type": "BIGNUMERIC"
+ },
+ {
+ "name": "row_num",
+ "type": "INTEGER",
+ "mode": "REQUIRED"
+ },
+ {
+ "name": "string_col",
+ "type": "STRING"
+ },
+ {
+ "name": "time_col",
+ "type": "TIME"
+ },
+ {
+ "name": "timestamp_col",
+ "type": "TIMESTAMP"
+ },
+ {
+ "name": "int64_list",
+ "type": "INTEGER",
+ "mode": "REPEATED"
+ },
+ {
+ "name": "struct_col",
+ "type": "RECORD",
+ "fields": [
+ {"name": "sub_int_col", "type": "INTEGER"}
+ ]
+ },
+ {
+ "name": "struct_list",
+ "type": "RECORD",
+ "fields": [
+ {"name": "sub_int_col", "type": "INTEGER"}
+ ],
+ "mode": "REPEATED"
+ }
+ ]
diff --git a/samples/to_dataframe/requirements.txt b/samples/to_dataframe/requirements.txt
index d47ff23d..063a02b2 100644
--- a/samples/to_dataframe/requirements.txt
+++ b/samples/to_dataframe/requirements.txt
@@ -1,8 +1,9 @@
google-auth==2.0.2
-google-cloud-bigquery-storage==2.6.3
-google-cloud-bigquery==2.24.1
+google-cloud-bigquery-storage==2.7.0
+google-cloud-bigquery==2.26.0
pyarrow==5.0.0
ipython==7.24.0; python_version > '3.6'
ipython==7.16.1; python_version <= '3.6'
pandas==1.2.5; python_version > '3.6'
pandas==1.1.5; python_version <= '3.6'
+tqdm==4.62.1
diff --git a/setup.py b/setup.py
index 9775f76f..a74f3e89 100644
--- a/setup.py
+++ b/setup.py
@@ -21,21 +21,22 @@
name = "google-cloud-bigquery-storage"
description = "BigQuery Storage API API client library"
-version = "2.7.0"
+version = "2.8.0"
release_status = "Development Status :: 5 - Production/Stable"
dependencies = [
# NOTE: Maintainers, please do not require google-api-core>=2.x.x
# Until this issue is closed
# https://github.com/googleapis/google-cloud-python/issues/10566
"google-api-core[grpc] >= 1.26.0, <3.0.0dev",
- "proto-plus >= 1.4.0",
+ "proto-plus >= 1.18.0",
"packaging >= 14.3",
"libcst >= 0.2.5",
]
extras = {
- "pandas": "pandas>=0.21.1",
- "fastavro": "fastavro>=0.21.2",
- "pyarrow": "pyarrow>=0.15.0",
+ "pandas": ["pandas>=0.21.1"],
+ "fastavro": ["fastavro>=0.21.2"],
+ "pyarrow": ["pyarrow>=0.15.0"],
+ "tests": ["freezegun"],
}
package_root = os.path.abspath(os.path.dirname(__file__))
diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt
index f9186709..eb1840b7 100644
--- a/testing/constraints-3.6.txt
+++ b/testing/constraints-3.6.txt
@@ -6,7 +6,7 @@
# e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev",
# Then this file should have foo==1.14.0
google-api-core==1.26.0
-proto-plus==1.4.0
+proto-plus==1.18.0
libcst==0.2.5
fastavro==0.21.2
pandas==0.21.1
diff --git a/tests/system/conftest.py b/tests/system/conftest.py
index 3a89097a..800da736 100644
--- a/tests/system/conftest.py
+++ b/tests/system/conftest.py
@@ -108,27 +108,6 @@ def dataset(project_id, bq_client):
bq_client.delete_dataset(dataset, delete_contents=True)
-@pytest.fixture
-def table(project_id, dataset, bq_client):
- from google.cloud import bigquery
-
- schema = [
- bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"),
- bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"),
- bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
- ]
-
- unique_suffix = str(uuid.uuid4()).replace("-", "_")
- table_id = "users_" + unique_suffix
- table_id_full = f"{project_id}.{dataset.dataset_id}.{table_id}"
- bq_table = bigquery.Table(table_id_full, schema=schema)
- created_table = bq_client.create_table(bq_table)
-
- yield created_table
-
- bq_client.delete_table(created_table)
-
-
@pytest.fixture(scope="session")
def bq_client(credentials, use_mtls):
if use_mtls:
diff --git a/tests/system/reader/test_reader.py b/tests/system/reader/test_reader.py
index aae3f6ec..24905c59 100644
--- a/tests/system/reader/test_reader.py
+++ b/tests/system/reader/test_reader.py
@@ -19,6 +19,7 @@
import datetime as dt
import decimal
import re
+import uuid
from google.cloud import bigquery
import pytest
@@ -30,6 +31,27 @@
_TABLE_FORMAT = "projects/{}/datasets/{}/tables/{}"
+@pytest.fixture
+def table(project_id, dataset, bq_client):
+ from google.cloud import bigquery
+
+ schema = [
+ bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"),
+ bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"),
+ bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
+ ]
+
+ unique_suffix = str(uuid.uuid4()).replace("-", "_")
+ table_id = "users_" + unique_suffix
+ table_id_full = f"{project_id}.{dataset.dataset_id}.{table_id}"
+ bq_table = bigquery.Table(table_id_full, schema=schema)
+ created_table = bq_client.create_table(bq_table)
+
+ yield created_table
+
+ bq_client.delete_table(created_table)
+
+
def _to_bq_table_ref(table_name_string, partition_suffix=""):
"""Converts protobuf table reference to bigquery table reference.
diff --git a/tests/system/test_writer.py b/tests/system/test_writer.py
new file mode 100644
index 00000000..a8d119d0
--- /dev/null
+++ b/tests/system/test_writer.py
@@ -0,0 +1,33 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from google.api_core import exceptions
+import pytest
+
+from google.cloud.bigquery_storage_v1beta2 import types as gapic_types
+
+
+@pytest.fixture(scope="session")
+def bqstorage_write_client(credentials):
+ from google.cloud import bigquery_storage_v1beta2
+
+ return bigquery_storage_v1beta2.BigQueryWriteClient(credentials=credentials)
+
+
+def test_append_rows_with_invalid_stream_name_fails_fast(bqstorage_write_client):
+ bad_request = gapic_types.AppendRowsRequest()
+ bad_request.write_stream = "this-is-an-invalid-stream-resource-path"
+
+ with pytest.raises(exceptions.GoogleAPICallError):
+ bqstorage_write_client.append_rows(bad_request)
diff --git a/tests/unit/test_writer_v1beta2.py b/tests/unit/test_writer_v1beta2.py
new file mode 100644
index 00000000..7da7c66a
--- /dev/null
+++ b/tests/unit/test_writer_v1beta2.py
@@ -0,0 +1,147 @@
+# Copyright 2021 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from unittest import mock
+
+import freezegun
+import pytest
+
+from google.api_core import exceptions
+from google.cloud.bigquery_storage_v1beta2.services import big_query_write
+from google.cloud.bigquery_storage_v1beta2 import types as gapic_types
+from google.protobuf import descriptor_pb2
+
+
+REQUEST_TEMPLATE = gapic_types.AppendRowsRequest()
+
+
+@pytest.fixture(scope="module")
+def module_under_test():
+ from google.cloud.bigquery_storage_v1beta2 import writer
+
+ return writer
+
+
+def test_constructor_and_default_state(module_under_test):
+ mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
+ manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
+
+ # Public state
+ assert manager.is_active is False
+
+ # Private state
+ assert manager._client is mock_client
+
+
+@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
+@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
+def test_open(background_consumer, bidi_rpc, module_under_test):
+ mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
+ request_template = gapic_types.AppendRowsRequest(
+ write_stream="stream-name-from-REQUEST_TEMPLATE",
+ offset=0,
+ proto_rows=gapic_types.AppendRowsRequest.ProtoData(
+ writer_schema=gapic_types.ProtoSchema(
+ proto_descriptor=descriptor_pb2.DescriptorProto()
+ )
+ ),
+ )
+ manager = module_under_test.AppendRowsStream(mock_client, request_template)
+ type(bidi_rpc.return_value).is_active = mock.PropertyMock(
+ return_value=(False, True)
+ )
+ proto_rows = gapic_types.ProtoRows()
+ proto_rows.serialized_rows.append(b"hello, world")
+ initial_request = gapic_types.AppendRowsRequest(
+ write_stream="this-is-a-stream-resource-path",
+ offset=42,
+ proto_rows=gapic_types.AppendRowsRequest.ProtoData(rows=proto_rows),
+ )
+
+ future = manager.open(initial_request)
+
+ assert isinstance(future, module_under_test.AppendRowsFuture)
+ background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
+ background_consumer.return_value.start.assert_called_once()
+ assert manager._consumer == background_consumer.return_value
+
+ # Make sure the request template and the first request are merged as
+ # expected. Needs to be especially careful that nested properties such as
+ # writer_schema and rows are merged as expected.
+ expected_request = gapic_types.AppendRowsRequest(
+ write_stream="this-is-a-stream-resource-path",
+ offset=42,
+ proto_rows=gapic_types.AppendRowsRequest.ProtoData(
+ writer_schema=gapic_types.ProtoSchema(
+ proto_descriptor=descriptor_pb2.DescriptorProto()
+ ),
+ rows=proto_rows,
+ ),
+ )
+ bidi_rpc.assert_called_once_with(
+ start_rpc=mock_client.append_rows,
+ initial_request=expected_request,
+ # Extra header is required to route requests to the correct location.
+ metadata=(
+ ("x-goog-request-params", "write_stream=this-is-a-stream-resource-path"),
+ ),
+ )
+
+ bidi_rpc.return_value.add_done_callback.assert_called_once_with(
+ manager._on_rpc_done
+ )
+ assert manager._rpc == bidi_rpc.return_value
+
+ manager._consumer.is_active = True
+ assert manager.is_active is True
+
+
+@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
+@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
+def test_open_with_timeout(background_consumer, bidi_rpc, module_under_test):
+ mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
+ manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
+ type(bidi_rpc.return_value).is_active = mock.PropertyMock(return_value=False)
+ type(background_consumer.return_value).is_active = mock.PropertyMock(
+ return_value=False
+ )
+ initial_request = gapic_types.AppendRowsRequest(
+ write_stream="this-is-a-stream-resource-path"
+ )
+
+ with pytest.raises(exceptions.Unknown), freezegun.freeze_time(auto_tick_seconds=1):
+ manager.open(initial_request, timeout=0.5)
+
+
+def test_future_done_false(module_under_test):
+ mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
+ manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
+ future = module_under_test.AppendRowsFuture(manager)
+ assert not future.done()
+
+
+def test_future_done_true_with_result(module_under_test):
+ mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
+ manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
+ future = module_under_test.AppendRowsFuture(manager)
+ future.set_result(object())
+ assert future.done()
+
+
+def test_future_done_true_with_exception(module_under_test):
+ mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
+ manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
+ future = module_under_test.AppendRowsFuture(manager)
+ future.set_exception(ValueError())
+ assert future.done()