diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index bb21147e4..b8edda51c 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -1,4 +1,4 @@ -# Copyright 2022 Google LLC +# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,4 +13,4 @@ # limitations under the License. docker: image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:3abfa0f1886adaf0b83f07cb117b24a639ea1cb9cffe56d43280b977033563eb + digest: sha256:2e247c7bf5154df7f98cce087a20ca7605e236340c7d6d1a14447e5c06791bd6 diff --git a/.kokoro/requirements.in b/.kokoro/requirements.in index cbd7e77f4..ec867d9fd 100644 --- a/.kokoro/requirements.in +++ b/.kokoro/requirements.in @@ -1,10 +1,10 @@ gcp-docuploader -gcp-releasetool +gcp-releasetool>=1.10.5 # required for compatibility with cryptography>=39.x importlib-metadata typing-extensions twine wheel setuptools -nox +nox>=2022.11.21 # required to remove dependency on py charset-normalizer<3 click<8.1.0 diff --git a/.kokoro/requirements.txt b/.kokoro/requirements.txt index 9c1b9be34..66a2172a7 100644 --- a/.kokoro/requirements.txt +++ b/.kokoro/requirements.txt @@ -1,6 +1,6 @@ # -# This file is autogenerated by pip-compile with python 3.10 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.9 +# by the following command: # # pip-compile --allow-unsafe --generate-hashes requirements.in # @@ -20,9 +20,9 @@ cachetools==5.2.0 \ --hash=sha256:6a94c6402995a99c3970cc7e4884bb60b4a8639938157eeed436098bf9831757 \ --hash=sha256:f9f17d2aec496a9aa6b76f53e3b614c965223c061982d434d160f930c698a9db # via google-auth -certifi==2022.9.24 \ - --hash=sha256:0d9c601124e5a6ba9712dbc60d9c53c21e34f5f641fe83002317394311bdce14 \ - --hash=sha256:90c1a32f1d68f940488354e36370f6cca89f0f106db09518524c88d6ed83f382 +certifi==2022.12.7 \ + --hash=sha256:35824b4c3a97115964b408844d64aa14db1cc518f6562e8d7261699d1350a9e3 \ + --hash=sha256:4ad3232f5e926d6718ec31cfc1fcadfde020920e278684144551c91769c7bc18 # via requests cffi==1.15.1 \ --hash=sha256:00a9ed42e88df81ffae7a8ab6d9356b371399b91dbdf0c3cb1e84c03a13aceb5 \ @@ -113,33 +113,28 @@ commonmark==0.9.1 \ --hash=sha256:452f9dc859be7f06631ddcb328b6919c67984aca654e5fefb3914d54691aed60 \ --hash=sha256:da2f38c92590f83de410ba1a3cbceafbc74fee9def35f9251ba9a971d6d66fd9 # via rich -cryptography==38.0.3 \ - --hash=sha256:068147f32fa662c81aebab95c74679b401b12b57494872886eb5c1139250ec5d \ - --hash=sha256:06fc3cc7b6f6cca87bd56ec80a580c88f1da5306f505876a71c8cfa7050257dd \ - --hash=sha256:25c1d1f19729fb09d42e06b4bf9895212292cb27bb50229f5aa64d039ab29146 \ - --hash=sha256:402852a0aea73833d982cabb6d0c3bb582c15483d29fb7085ef2c42bfa7e38d7 \ - --hash=sha256:4e269dcd9b102c5a3d72be3c45d8ce20377b8076a43cbed6f660a1afe365e436 \ - --hash=sha256:5419a127426084933076132d317911e3c6eb77568a1ce23c3ac1e12d111e61e0 \ - --hash=sha256:554bec92ee7d1e9d10ded2f7e92a5d70c1f74ba9524947c0ba0c850c7b011828 \ - --hash=sha256:5e89468fbd2fcd733b5899333bc54d0d06c80e04cd23d8c6f3e0542358c6060b \ - --hash=sha256:65535bc550b70bd6271984d9863a37741352b4aad6fb1b3344a54e6950249b55 \ - --hash=sha256:6ab9516b85bebe7aa83f309bacc5f44a61eeb90d0b4ec125d2d003ce41932d36 \ - --hash=sha256:6addc3b6d593cd980989261dc1cce38263c76954d758c3c94de51f1e010c9a50 \ - --hash=sha256:728f2694fa743a996d7784a6194da430f197d5c58e2f4e278612b359f455e4a2 \ - --hash=sha256:785e4056b5a8b28f05a533fab69febf5004458e20dad7e2e13a3120d8ecec75a \ - --hash=sha256:78cf5eefac2b52c10398a42765bfa981ce2372cbc0457e6bf9658f41ec3c41d8 \ - --hash=sha256:7f836217000342d448e1c9a342e9163149e45d5b5eca76a30e84503a5a96cab0 \ - --hash=sha256:8d41a46251bf0634e21fac50ffd643216ccecfaf3701a063257fe0b2be1b6548 \ - --hash=sha256:984fe150f350a3c91e84de405fe49e688aa6092b3525f407a18b9646f6612320 \ - --hash=sha256:9b24bcff7853ed18a63cfb0c2b008936a9554af24af2fb146e16d8e1aed75748 \ - --hash=sha256:b1b35d9d3a65542ed2e9d90115dfd16bbc027b3f07ee3304fc83580f26e43249 \ - --hash=sha256:b1b52c9e5f8aa2b802d48bd693190341fae201ea51c7a167d69fc48b60e8a959 \ - --hash=sha256:bbf203f1a814007ce24bd4d51362991d5cb90ba0c177a9c08825f2cc304d871f \ - --hash=sha256:be243c7e2bfcf6cc4cb350c0d5cdf15ca6383bbcb2a8ef51d3c9411a9d4386f0 \ - --hash=sha256:bfbe6ee19615b07a98b1d2287d6a6073f734735b49ee45b11324d85efc4d5cbd \ - --hash=sha256:c46837ea467ed1efea562bbeb543994c2d1f6e800785bd5a2c98bc096f5cb220 \ - --hash=sha256:dfb4f4dd568de1b6af9f4cda334adf7d72cf5bc052516e1b2608b683375dd95c \ - --hash=sha256:ed7b00096790213e09eb11c97cc6e2b757f15f3d2f85833cd2d3ec3fe37c1722 +cryptography==39.0.1 \ + --hash=sha256:0f8da300b5c8af9f98111ffd512910bc792b4c77392a9523624680f7956a99d4 \ + --hash=sha256:35f7c7d015d474f4011e859e93e789c87d21f6f4880ebdc29896a60403328f1f \ + --hash=sha256:5aa67414fcdfa22cf052e640cb5ddc461924a045cacf325cd164e65312d99502 \ + --hash=sha256:5d2d8b87a490bfcd407ed9d49093793d0f75198a35e6eb1a923ce1ee86c62b41 \ + --hash=sha256:6687ef6d0a6497e2b58e7c5b852b53f62142cfa7cd1555795758934da363a965 \ + --hash=sha256:6f8ba7f0328b79f08bdacc3e4e66fb4d7aab0c3584e0bd41328dce5262e26b2e \ + --hash=sha256:706843b48f9a3f9b9911979761c91541e3d90db1ca905fd63fee540a217698bc \ + --hash=sha256:807ce09d4434881ca3a7594733669bd834f5b2c6d5c7e36f8c00f691887042ad \ + --hash=sha256:83e17b26de248c33f3acffb922748151d71827d6021d98c70e6c1a25ddd78505 \ + --hash=sha256:96f1157a7c08b5b189b16b47bc9db2332269d6680a196341bf30046330d15388 \ + --hash=sha256:aec5a6c9864be7df2240c382740fcf3b96928c46604eaa7f3091f58b878c0bb6 \ + --hash=sha256:b0afd054cd42f3d213bf82c629efb1ee5f22eba35bf0eec88ea9ea7304f511a2 \ + --hash=sha256:ced4e447ae29ca194449a3f1ce132ded8fcab06971ef5f618605aacaa612beac \ + --hash=sha256:d1f6198ee6d9148405e49887803907fe8962a23e6c6f83ea7d98f1c0de375695 \ + --hash=sha256:e124352fd3db36a9d4a21c1aa27fd5d051e621845cb87fb851c08f4f75ce8be6 \ + --hash=sha256:e422abdec8b5fa8462aa016786680720d78bdce7a30c652b7fadf83a4ba35336 \ + --hash=sha256:ef8b72fa70b348724ff1218267e7f7375b8de4e8194d1636ee60510aae104cd0 \ + --hash=sha256:f0c64d1bd842ca2633e74a1a28033d139368ad959872533b1bab8c80e8240a0c \ + --hash=sha256:f24077a3b5298a5a06a8e0536e3ea9ec60e4c7ac486755e5fb6e6ea9b3500106 \ + --hash=sha256:fdd188c8a6ef8769f148f88f859884507b954cc64db6b52f66ef199bb9ad660a \ + --hash=sha256:fe913f20024eb2cb2f323e42a64bdf2911bb9738a15dba7d3cce48151034e3a8 # via # gcp-releasetool # secretstorage @@ -159,9 +154,9 @@ gcp-docuploader==0.6.4 \ --hash=sha256:01486419e24633af78fd0167db74a2763974765ee8078ca6eb6964d0ebd388af \ --hash=sha256:70861190c123d907b3b067da896265ead2eeb9263969d6955c9e0bb091b5ccbf # via -r requirements.in -gcp-releasetool==1.10.0 \ - --hash=sha256:72a38ca91b59c24f7e699e9227c90cbe4dd71b789383cb0164b088abae294c83 \ - --hash=sha256:8c7c99320208383d4bb2b808c6880eb7a81424afe7cdba3c8d84b25f4f0e097d +gcp-releasetool==1.10.5 \ + --hash=sha256:174b7b102d704b254f2a26a3eda2c684fd3543320ec239baf771542a2e58e109 \ + --hash=sha256:e29d29927fe2ca493105a82958c6873bb2b90d503acac56be2c229e74de0eec9 # via -r requirements.in google-api-core==2.10.2 \ --hash=sha256:10c06f7739fe57781f87523375e8e1a3a4674bf6392cd6131a3222182b971320 \ @@ -340,9 +335,9 @@ more-itertools==9.0.0 \ --hash=sha256:250e83d7e81d0c87ca6bd942e6aeab8cc9daa6096d12c5308f3f92fa5e5c1f41 \ --hash=sha256:5a6257e40878ef0520b1803990e3e22303a41b5714006c32a3fd8304b26ea1ab # via jaraco-classes -nox==2022.8.7 \ - --hash=sha256:1b894940551dc5c389f9271d197ca5d655d40bdc6ccf93ed6880e4042760a34b \ - --hash=sha256:96cca88779e08282a699d672258ec01eb7c792d35bbbf538c723172bce23212c +nox==2022.11.21 \ + --hash=sha256:0e41a990e290e274cb205a976c4c97ee3c5234441a8132c8c3fd9ea3c22149eb \ + --hash=sha256:e21c31de0711d1274ca585a2c5fde36b1aa962005ba8e9322bf5eeed16dcd684 # via -r requirements.in packaging==21.3 \ --hash=sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb \ @@ -385,10 +380,6 @@ protobuf==3.20.3 \ # gcp-docuploader # gcp-releasetool # google-api-core -py==1.11.0 \ - --hash=sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719 \ - --hash=sha256:607c53218732647dff4acdfcd50cb62615cedf612e72d1724fb1a0cc6405b378 - # via nox pyasn1==0.4.8 \ --hash=sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d \ --hash=sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba diff --git a/.kokoro/samples/python3.11/common.cfg b/.kokoro/samples/python3.11/common.cfg new file mode 100644 index 000000000..f9443bb73 --- /dev/null +++ b/.kokoro/samples/python3.11/common.cfg @@ -0,0 +1,40 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +# Build logs will be here +action { + define_artifacts { + regex: "**/*sponge_log.xml" + } +} + +# Specify which tests to run +env_vars: { + key: "RUN_TESTS_SESSION" + value: "py-3.11" +} + +# Declare build specific Cloud project. +env_vars: { + key: "BUILD_SPECIFIC_GCLOUD_PROJECT" + value: "python-docs-samples-tests-311" +} + +env_vars: { + key: "TRAMPOLINE_BUILD_FILE" + value: "github/python-storage/.kokoro/test-samples.sh" +} + +# Configure the docker image for kokoro-trampoline. +env_vars: { + key: "TRAMPOLINE_IMAGE" + value: "gcr.io/cloud-devrel-kokoro-resources/python-samples-testing-docker" +} + +# Download secrets for samples +gfile_resources: "/bigstore/cloud-devrel-kokoro-resources/python-docs-samples" + +# Download trampoline resources. +gfile_resources: "/bigstore/cloud-devrel-kokoro-resources/trampoline" + +# Use the trampoline script to run in docker. +build_file: "python-storage/.kokoro/trampoline_v2.sh" \ No newline at end of file diff --git a/.kokoro/samples/python3.11/continuous.cfg b/.kokoro/samples/python3.11/continuous.cfg new file mode 100644 index 000000000..a1c8d9759 --- /dev/null +++ b/.kokoro/samples/python3.11/continuous.cfg @@ -0,0 +1,6 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} \ No newline at end of file diff --git a/.kokoro/samples/python3.11/periodic-head.cfg b/.kokoro/samples/python3.11/periodic-head.cfg new file mode 100644 index 000000000..5d0faf58f --- /dev/null +++ b/.kokoro/samples/python3.11/periodic-head.cfg @@ -0,0 +1,11 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} + +env_vars: { + key: "TRAMPOLINE_BUILD_FILE" + value: "github/python-storage/.kokoro/test-samples-against-head.sh" +} diff --git a/.kokoro/samples/python3.11/periodic.cfg b/.kokoro/samples/python3.11/periodic.cfg new file mode 100644 index 000000000..71cd1e597 --- /dev/null +++ b/.kokoro/samples/python3.11/periodic.cfg @@ -0,0 +1,6 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "False" +} diff --git a/.kokoro/samples/python3.11/presubmit.cfg b/.kokoro/samples/python3.11/presubmit.cfg new file mode 100644 index 000000000..a1c8d9759 --- /dev/null +++ b/.kokoro/samples/python3.11/presubmit.cfg @@ -0,0 +1,6 @@ +# Format: //devtools/kokoro/config/proto/build.proto + +env_vars: { + key: "INSTALL_LIBRARY_FROM_SOURCE" + value: "True" +} \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 46d237160..5405cc8ff 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -25,7 +25,7 @@ repos: rev: 22.3.0 hooks: - id: black -- repo: https://gitlab.com/pycqa/flake8 +- repo: https://github.com/pycqa/flake8 rev: 3.9.2 hooks: - id: flake8 diff --git a/CHANGELOG.md b/CHANGELOG.md index 383ddfed2..4669fb62f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,26 @@ [1]: https://pypi.org/project/google-cloud-storage/#history +## [2.8.0](https://github.com/googleapis/python-storage/compare/v2.7.0...v2.8.0) (2023-03-29) + + +### Features + +* Add multiprocessing and chunked downloading to transfer manager ([#1002](https://github.com/googleapis/python-storage/issues/1002)) ([e65316b](https://github.com/googleapis/python-storage/commit/e65316b5352a4e15c4dba806e899ad58f8665464)) + + +### Bug Fixes + +* Add trove classifier for python 3.11 ([#971](https://github.com/googleapis/python-storage/issues/971)) ([7886376](https://github.com/googleapis/python-storage/commit/7886376e5105f705a5fe9d061463cf1e033aecd0)) +* Remove use of deprecated cgi module ([#1006](https://github.com/googleapis/python-storage/issues/1006)) ([3071832](https://github.com/googleapis/python-storage/commit/30718322f6c7b1d7a3e4cfd44b6e1796f721b655)) + + +### Documentation + +* Add clarifications to read timeout ([#873](https://github.com/googleapis/python-storage/issues/873)) ([8fb26f4](https://github.com/googleapis/python-storage/commit/8fb26f439cf28ac4ec7a841db1cd0fd60ea77362)) +* Fix c.g.c structure ([#982](https://github.com/googleapis/python-storage/issues/982)) ([d5a2931](https://github.com/googleapis/python-storage/commit/d5a29318b5c68678ea63eb40a4dfede562f8963e)) +* Update c.g.c docs and guides ([#994](https://github.com/googleapis/python-storage/issues/994)) ([62b4a50](https://github.com/googleapis/python-storage/commit/62b4a500e40860c54c53d12323434d28739f9812)) + ## [2.7.0](https://github.com/googleapis/python-storage/compare/v2.6.0...v2.7.0) (2022-12-07) diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index f0118678a..80c4bfb58 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -21,8 +21,8 @@ In order to add a feature: - The feature must be documented in both the API and narrative documentation. -- The feature must work fully on the following CPython versions: 2.7, - 3.5, 3.6, 3.7, 3.8, 3.9 and 3.10 on both UNIX and Windows. +- The feature must work fully on the following CPython versions: + 3.7, 3.8, 3.9, 3.10 and 3.11 on both UNIX and Windows. - The feature must not add unnecessary dependencies (where "unnecessary" is of course subjective, but new dependencies should diff --git a/README.rst b/README.rst index 3b2f84736..61b5a62eb 100644 --- a/README.rst +++ b/README.rst @@ -1,12 +1,21 @@ -Python Client for Google Cloud Storage API -========================================== +Python Client for Google Cloud Storage +====================================== |stable| |pypi| |versions| -`Google Cloud Storage API`_: is a durable and highly available object storage service. Google Cloud Storage is almost infinitely scalable and guarantees consistency: when a write succeeds, the latest copy of the object will be returned to any GET, globally. +`Google Cloud Storage`_ is a managed service for storing unstructured data. Cloud Storage +allows world-wide storage and retrieval of any amount of data at any time. You can use +Cloud Storage for a range of scenarios including serving website content, storing data +for archival and disaster recovery, or distributing large data objects to users via direct download. + +A comprehensive list of changes in each version may be found in the `CHANGELOG`_. -- `Client Library Documentation`_ - `Product Documentation`_ +- `Client Library Documentation`_ +- `github.com/googleapis/python-storage`_ + +Read more about the client libraries for Cloud APIs, including the older +Google APIs Client Libraries, in `Client Libraries Explained`_. .. |stable| image:: https://img.shields.io/badge/support-stable-gold.svg :target: https://github.com/googleapis/google-cloud-python/blob/main/README.rst#stability-levels @@ -14,43 +23,51 @@ Python Client for Google Cloud Storage API :target: https://pypi.org/project/google-cloud-storage/ .. |versions| image:: https://img.shields.io/pypi/pyversions/google-cloud-storage.svg :target: https://pypi.org/project/google-cloud-storage/ -.. _Google Cloud Storage API: https://cloud.google.com/storage +.. _Google Cloud Storage: https://cloud.google.com/storage .. _Client Library Documentation: https://cloud.google.com/python/docs/reference/storage/latest .. _Product Documentation: https://cloud.google.com/storage +.. _CHANGELOG: https://github.com/googleapis/python-storage/blob/main/CHANGELOG.md +.. _github.com/googleapis/python-storage: https://github.com/googleapis/python-storage +.. _Client Libraries Explained: https://cloud.google.com/apis/docs/client-libraries-explained Quick Start ----------- -In order to use this library, you first need to go through the following steps: +In order to use this library, you first need to go through the following steps. +A step-by-step guide may also be found in `Get Started with Client Libraries`_. 1. `Select or create a Cloud Platform project.`_ 2. `Enable billing for your project.`_ 3. `Enable the Google Cloud Storage API.`_ 4. `Setup Authentication.`_ +.. _Get Started with Client Libraries: https://cloud.google.com/storage/docs/reference/libraries#client-libraries-install-python .. _Select or create a Cloud Platform project.: https://console.cloud.google.com/project .. _Enable billing for your project.: https://cloud.google.com/billing/docs/how-to/modify-project#enable_billing_for_a_project -.. _Enable the Google Cloud Storage API.: https://cloud.google.com/storage -.. _Setup Authentication.: https://googleapis.dev/python/google-api-core/latest/auth.html +.. _Enable the Google Cloud Storage API.: https://console.cloud.google.com/flows/enableapi?apiid=storage-api.googleapis.com +.. _Setup Authentication.: https://cloud.google.com/docs/authentication/client-libraries Installation ~~~~~~~~~~~~ -Install this library in a `virtualenv`_ using pip. `virtualenv`_ is a tool to -create isolated Python environments. The basic problem it addresses is one of -dependencies and versions, and indirectly permissions. +Install this library in a virtual environment using `venv`_. `venv`_ is a tool that +creates isolated Python environments. These isolated environments can have separate +versions of Python packages, which allows you to isolate one project's dependencies +from the dependencies of other projects. -With `virtualenv`_, it's possible to install this library without needing system +With `venv`_, it's possible to install this library without needing system install permissions, and without clashing with the installed system dependencies. -.. _`virtualenv`: https://virtualenv.pypa.io/en/latest/ +.. _`venv`: https://docs.python.org/3/library/venv.html Code samples and snippets ~~~~~~~~~~~~~~~~~~~~~~~~~ -Code samples and snippets live in the `samples/` folder. +Code samples and snippets live in the `samples/`_ folder. + +.. _`samples/`: https://github.com/googleapis/python-storage/tree/main/samples Supported Python Versions @@ -77,10 +94,9 @@ Mac/Linux .. code-block:: console - pip install virtualenv - virtualenv + python3 -m venv source /bin/activate - /bin/pip install google-cloud-storage + pip install google-cloud-storage Windows @@ -88,20 +104,19 @@ Windows .. code-block:: console - pip install virtualenv - virtualenv - \Scripts\activate - \Scripts\pip.exe install google-cloud-storage + py -m venv + .\\Scripts\activate + pip install google-cloud-storage Next Steps ~~~~~~~~~~ +- Read the `Google Cloud Storage Product documentation`_ to learn + more about the product and see How-to Guides. - Read the `Client Library Documentation`_ for Google Cloud Storage API to see other available methods on the client. -- Read the `Google Cloud Storage API Product documentation`_ to learn - more about the product and see How-to Guides. - View this `README`_ to see the full list of Cloud APIs that we cover. -.. _Google Cloud Storage API Product documentation: https://cloud.google.com/storage +.. _Google Cloud Storage Product documentation: https://cloud.google.com/storage .. _README: https://github.com/googleapis/google-cloud-python/blob/main/README.rst diff --git a/docs/acl_guide.rst b/docs/acl_guide.rst new file mode 100644 index 000000000..13ba4e660 --- /dev/null +++ b/docs/acl_guide.rst @@ -0,0 +1,165 @@ +Managing Access to Data +======================= + +Cloud Storage offers two systems for granting users access your buckets and objects: +IAM and Access Control Lists (ACLs). These systems act in parallel - in order for a user to +access a Cloud Storage resource, only one of the systems needs to grant that user permission. +For additional access control options, see also: +`Cloud Storage Control Access to Data `_ + + +ACL +--- + +Cloud Storage uses access control lists (ACLs) to manage object and bucket access. +ACLs are the mechanism you use to share files with other users and allow +other users to access your buckets and files. + +ACLs are suitable for fine-grained control, but you may prefer using IAM to +control access at the project level. + + +:class:`google.cloud.storage.bucket.Bucket` has a getting method that creates +an ACL object under the hood, and you can interact with that using +:func:`google.cloud.storage.bucket.Bucket.acl`: + +.. code-block:: python + + client = storage.Client() + bucket = client.get_bucket(bucket_name) + acl = bucket.acl + +Adding and removing permissions can be done with the following methods +(in increasing order of granularity): + +- :func:`ACL.all` + corresponds to access for all users. +- :func:`ACL.all_authenticated` corresponds + to access for all users that are signed into a Google account. +- :func:`ACL.domain` corresponds to access on a + per Google Apps domain (ie, ``example.com``). +- :func:`ACL.group` corresponds to access on a + per group basis (either by ID or e-mail address). +- :func:`ACL.user` corresponds to access on a + per user basis (either by ID or e-mail address). + +And you are able to ``grant`` and ``revoke`` the following roles: + +- **Reading**: + :func:`_ACLEntity.grant_read` and :func:`_ACLEntity.revoke_read` +- **Writing**: + :func:`_ACLEntity.grant_write` and :func:`_ACLEntity.revoke_write` +- **Owning**: + :func:`_ACLEntity.grant_owner` and :func:`_ACLEntity.revoke_owner` + +You can use any of these like any other factory method (these happen to +be :class:`_ACLEntity` factories): + +.. code-block:: python + + acl.user("me@example.org").grant_read() + acl.all_authenticated().grant_write() + +After that, you can save any changes you make with the +:func:`google.cloud.storage.acl.ACL.save` method: + +.. code-block:: python + + acl.save() + + +You can alternatively save any existing :class:`google.cloud.storage.acl.ACL` +object (whether it was created by a factory method or not) from a +:class:`google.cloud.storage.bucket.Bucket`: + +.. code-block:: python + + bucket.acl.save(acl=acl) + + +To get the list of ``entity`` and ``role`` for each unique pair, the +:class:`ACL` class is iterable: + +.. code-block:: python + + print(list(acl)) + # [{'role': 'OWNER', 'entity': 'allUsers'}, ...] + + +This list of tuples can be used as the ``entity`` and ``role`` fields +when sending metadata for ACLs to the API. + + +IAM +--- + +Identity and Access Management (IAM) controls permissioning throughout Google Cloud and allows you +to grant permissions at the bucket and project levels. You should use IAM for any permissions that +apply to multiple objects in a bucket to reduce the risks of unintended exposure. To use IAM +exclusively, enable uniform bucket-level access to disallow ACLs for all Cloud Storage resources. +See also: +`Additional access control options `_ + +Constants used across IAM roles: +:::::::::::::::::::::::::::::::: + +- ``STORAGE_OBJECT_CREATOR_ROLE = "roles/storage.objectCreator"`` + corresponds to role implying rights to create objects, but not delete or overwrite them. +- ``STORAGE_OBJECT_VIEWER_ROLE = "roles/storage.objectViewer"`` + corresponds to role implying rights to view object properties, excluding ACLs. +- ``STORAGE_OBJECT_ADMIN_ROLE = "roles/storage.objectAdmin"`` + corresponds to role implying full control of objects. +- ``STORAGE_ADMIN_ROLE = "roles/storage.admin"`` + corresponds to role implying full control of objects and buckets. +- ``STORAGE_VIEWER_ROLE = "Viewer"`` + corresponds to role that can list buckets. +- ``STORAGE_EDITOR_ROLE = "Editor"`` + corresponds to role that can create, list, and delete buckets. +- ``STORAGE_OWNER_ROLE = "Owners"`` + corresponds to role that can Can create, list, and delete buckets; + and list tag bindings; and control HMAC keys in the project. + +Constants used across IAM permissions: +:::::::::::::::::::::::::::::::::::::: + +- ``STORAGE_BUCKETS_CREATE = "storage.buckets.create"`` + corresponds to permission that can create buckets. + +- ``STORAGE_BUCKETS_DELETE = "storage.buckets.delete"`` + corresponds to permission that can delete buckets. + +- ``STORAGE_BUCKETS_GET = "storage.buckets.get"`` + corresponds to permission that can read bucket metadata, excluding ACLs. + +- ``STORAGE_BUCKETS_LIST = "storage.buckets.list"`` + corresponds to permission that can list buckets. + +- ``STORAGE_BUCKETS_GET_IAM_POLICY = "storage.buckets.getIamPolicy"`` + corresponds to permission that can read bucket ACLs. + +- ``STORAGE_BUCKETS_SET_IAM_POLICY = "storage.buckets.setIamPolicy"`` + corresponds to permission that can update bucket ACLs. + +- ``STORAGE_BUCKETS_UPDATE = "storage.buckets.update"`` + corresponds to permission that can update buckets, excluding ACLS. + +- ``STORAGE_OBJECTS_CREATE = "storage.objects.create"`` + corresponds to permission that can add new objects to a bucket. + +- ``STORAGE_OBJECTS_DELETE = "storage.objects.delete"`` + corresponds to permission that can delete objects. + +- ``STORAGE_OBJECTS_GET = "storage.objects.get"`` + corresponds to permission that can read object data / metadata, excluding ACLs. + +- ``STORAGE_OBJECTS_LIST = "storage.objects.list"`` + corresponds to permission that can list objects in a bucket. + +- ``STORAGE_OBJECTS_GET_IAM_POLICY = "storage.objects.getIamPolicy"`` + corresponds to permission that can read object ACLs. + +- ``STORAGE_OBJECTS_SET_IAM_POLICY = "storage.objects.setIamPolicy"`` + corresponds to permission that can update object ACLs. + +- ``STORAGE_OBJECTS_UPDATE = "storage.objects.update"`` + corresponds to permission that can update object metadata, excluding ACLs. diff --git a/docs/storage/generation_metageneration.rst b/docs/generation_metageneration.rst similarity index 100% rename from docs/storage/generation_metageneration.rst rename to docs/generation_metageneration.rst diff --git a/docs/index.rst b/docs/index.rst index 5a9109944..07d236e25 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -8,12 +8,31 @@ :class:`multiprocessing.Pool` or :class:`multiprocessing.Process` invokes :func:`os.fork`. +Guides +------ +.. toctree:: + :maxdepth: 2 + + acl_guide + generation_metageneration + retry_timeout + API Reference ------------- .. toctree:: :maxdepth: 2 - storage/modules + storage/acl + storage/batch + storage/blob + storage/bucket + storage/client + storage/constants + storage/fileio + storage/hmac_key + storage/notification + storage/retry + More Examples ------------- diff --git a/docs/storage/retry_timeout.rst b/docs/retry_timeout.rst similarity index 85% rename from docs/storage/retry_timeout.rst rename to docs/retry_timeout.rst index bc1912658..c9911a3f2 100644 --- a/docs/storage/retry_timeout.rst +++ b/docs/retry_timeout.rst @@ -12,22 +12,27 @@ Configuring Timeouts -------------------- For a number of reasons, methods which invoke API methods may take -longer than expected or desired. By default, such methods all time out -after a default interval, 60.0 seconds. Rather than blocking your application -code for that interval, you may choose to configure explicit timeouts -in your code, using one of three forms: +longer than expected or desired. By default, such methods are applied a +default timeout of 60.0 seconds. -- You can pass a single integer or float which functions as the timeout for the - entire request. E.g.: +The python-storage client uses the timeout mechanics of the underlying +``requests`` HTTP library. The connect timeout is the number of seconds +to establish a connection to the server. The read timeout is the number +of seconds the client will wait for the server to send a response. +In most cases, this is the maximum wait time before the server sends +the first byte. Please refer to the `requests documentation `_ for details. + +You may also choose to configure explicit timeouts in your code, using one of three forms: + +- You can specify a single value for the timeout. The timeout value will be + applied to both the connect and the read timeouts. E.g.: .. code-block:: python bucket = client.get_bucket(BUCKET_NAME, timeout=300.0) # five minutes -- You can also be passed as a two-tuple, ``(connect_timeout, read_timeout)``, - where the ``connect_timeout`` sets the maximum time required to establish - the connection to the server, and the ``read_timeout`` sets the maximum - time to wait for a completed response. E.g.: +- You can also pass a two-tuple, ``(connect_timeout, read_timeout)``, + if you would like to set the values separately. E.g.: .. code-block:: python diff --git a/docs/storage/snippets.py b/docs/snippets.py similarity index 100% rename from docs/storage/snippets.py rename to docs/snippets.py diff --git a/docs/storage/acl.rst b/docs/storage/acl.rst index f96cd6597..4c8562626 100644 --- a/docs/storage/acl.rst +++ b/docs/storage/acl.rst @@ -1,88 +1,5 @@ -ACL -=== - -Cloud Storage uses access control lists (ACLs) to manage object and bucket access. -ACLs are the mechanism you use to share files with other users and allow -other users to access your buckets and files. - -ACLs are suitable for fine-grained control, but you may prefer using IAM to -control access at the project level. See also: -`Cloud Storage Control Access to Data `_ - - -:class:`google.cloud.storage.bucket.Bucket` has a getting method that creates -an ACL object under the hood, and you can interact with that using -:func:`google.cloud.storage.bucket.Bucket.acl`: - -.. code-block:: python - - client = storage.Client() - bucket = client.get_bucket(bucket_name) - acl = bucket.acl - -Adding and removing permissions can be done with the following methods -(in increasing order of granularity): - -- :func:`ACL.all` - corresponds to access for all users. -- :func:`ACL.all_authenticated` corresponds - to access for all users that are signed into a Google account. -- :func:`ACL.domain` corresponds to access on a - per Google Apps domain (ie, ``example.com``). -- :func:`ACL.group` corresponds to access on a - per group basis (either by ID or e-mail address). -- :func:`ACL.user` corresponds to access on a - per user basis (either by ID or e-mail address). - -And you are able to ``grant`` and ``revoke`` the following roles: - -- **Reading**: - :func:`_ACLEntity.grant_read` and :func:`_ACLEntity.revoke_read` -- **Writing**: - :func:`_ACLEntity.grant_write` and :func:`_ACLEntity.revoke_write` -- **Owning**: - :func:`_ACLEntity.grant_owner` and :func:`_ACLEntity.revoke_owner` - -You can use any of these like any other factory method (these happen to -be :class:`_ACLEntity` factories): - -.. code-block:: python - - acl.user("me@example.org").grant_read() - acl.all_authenticated().grant_write() - -After that, you can save any changes you make with the -:func:`google.cloud.storage.acl.ACL.save` method: - -.. code-block:: python - - acl.save() - - -You can alternatively save any existing :class:`google.cloud.storage.acl.ACL` -object (whether it was created by a factory method or not) from a -:class:`google.cloud.storage.bucket.Bucket`: - -.. code-block:: python - - bucket.acl.save(acl=acl) - - -To get the list of ``entity`` and ``role`` for each unique pair, the -:class:`ACL` class is iterable: - -.. code-block:: python - - print(list(acl)) - # [{'role': 'OWNER', 'entity': 'allUsers'}, ...] - - -This list of tuples can be used as the ``entity`` and ``role`` fields -when sending metadata for ACLs to the API. - - ACL Module ----------- +----------- .. automodule:: google.cloud.storage.acl :members: diff --git a/docs/storage/blobs.rst b/docs/storage/blob.rst similarity index 100% rename from docs/storage/blobs.rst rename to docs/storage/blob.rst diff --git a/docs/storage/buckets.rst b/docs/storage/bucket.rst similarity index 93% rename from docs/storage/buckets.rst rename to docs/storage/bucket.rst index c42d7e303..e63fe2115 100644 --- a/docs/storage/buckets.rst +++ b/docs/storage/bucket.rst @@ -1,4 +1,4 @@ -Buckets +Bucket ~~~~~~~ .. automodule:: google.cloud.storage.bucket diff --git a/docs/storage/modules.rst b/docs/storage/modules.rst deleted file mode 100644 index 9148a4385..000000000 --- a/docs/storage/modules.rst +++ /dev/null @@ -1,17 +0,0 @@ -Modules for Python Storage --------------------------- -.. toctree:: - :maxdepth: 2 - - client - blobs - buckets - acl - batch - fileio - constants - hmac_key - notification - retry - retry_timeout - generation_metageneration \ No newline at end of file diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 6f4952f44..a6f5222ea 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -18,7 +18,6 @@ """ import base64 -import cgi import copy import hashlib from io import BytesIO @@ -27,6 +26,7 @@ import mimetypes import os import re +from email.parser import HeaderParser from urllib.parse import parse_qsl from urllib.parse import quote from urllib.parse import urlencode @@ -1628,7 +1628,8 @@ def download_as_text( return data.decode(encoding) if self.content_type is not None: - _, params = cgi.parse_header(self.content_type) + msg = HeaderParser().parsestr("Content-Type: " + self.content_type) + params = dict(msg.get_params()[1:]) if "charset" in params: return data.decode(params["charset"]) diff --git a/google/cloud/storage/client.py b/google/cloud/storage/client.py index f54bf6043..796f1c654 100644 --- a/google/cloud/storage/client.py +++ b/google/cloud/storage/client.py @@ -130,6 +130,11 @@ def __init__( if project is _marker: project = None + # Save the initial value of client_info and client_options before they + # are passed along, for use in __reduce__ defined elsewhere. + self._initial_client_info = client_info + self._initial_client_options = client_options + kw_args = {"client_info": client_info} # `api_endpoint` should be only set by the user via `client_options`, diff --git a/google/cloud/storage/constants.py b/google/cloud/storage/constants.py index 5d6497295..eba0a19df 100644 --- a/google/cloud/storage/constants.py +++ b/google/cloud/storage/constants.py @@ -12,7 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Constants used across google.cloud.storage modules.""" +"""Constants used across google.cloud.storage modules. + +See [Python Storage Client Constants Page](https://github.com/googleapis/python-storage/blob/main/google/cloud/storage/constants.py) +for constants used across storage classes, location types, public access prevention, etc. + +""" # Storage classes diff --git a/google/cloud/storage/transfer_manager.py b/google/cloud/storage/transfer_manager.py index e87f0cc76..8de9c6c7b 100644 --- a/google/cloud/storage/transfer_manager.py +++ b/google/cloud/storage/transfer_manager.py @@ -16,10 +16,16 @@ import concurrent.futures +import io +import inspect import os import warnings +import pickle +import copyreg from google.api_core import exceptions +from google.cloud.storage import Client +from google.cloud.storage import Blob warnings.warn( "The module `transfer_manager` is a preview feature. Functionality and API " @@ -27,16 +33,54 @@ ) -DEFAULT_CHUNK_SIZE = 200 * 1024 * 1024 +TM_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024 +DEFAULT_MAX_WORKERS = 8 +# Constants to be passed in as `worker_type`. +PROCESS = "process" +THREAD = "thread" + + +_cached_clients = {} + + +def _deprecate_threads_param(func): + def convert_threads_or_raise(*args, **kwargs): + binding = inspect.signature(func).bind(*args, **kwargs) + threads = binding.arguments.get("threads") + if threads: + worker_type = binding.arguments.get("worker_type") + max_workers = binding.arguments.get("max_workers") + if worker_type or max_workers: # Parameter conflict + raise ValueError( + "The `threads` parameter is deprecated and conflicts with its replacement parameters, `worker_type` and `max_workers`." + ) + # No conflict, so issue a warning and set worker_type and max_workers. + warnings.warn( + "The `threads` parameter is deprecated. Please use `worker_type` and `max_workers` parameters instead." + ) + args = binding.args + kwargs = binding.kwargs + kwargs["worker_type"] = THREAD + kwargs["max_workers"] = threads + return func(*args, **kwargs) + else: + return func(*args, **kwargs) + + return convert_threads_or_raise + + +@_deprecate_threads_param def upload_many( file_blob_pairs, skip_if_exists=False, upload_kwargs=None, - threads=4, + threads=None, deadline=None, raise_exception=False, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, ): """Upload many files concurrently via a worker pool. @@ -48,6 +92,9 @@ def upload_many( uploaded to the corresponding blob by using blob.upload_from_file() or blob.upload_from_filename() as appropriate. + File handlers are only supported if worker_type is set to THREAD. + If worker_type is set to PROCESS, please use filenames only. + :type skip_if_exists: bool :param skip_if_exists: If True, blobs that already have a live version will not be overwritten. @@ -65,14 +112,10 @@ def upload_many( :type threads: int :param threads: - The number of threads to use in the worker pool. This is passed to - `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer - to standard library documentation for details. - - The performance impact of this value depends on the use case, but - generally, smaller files benefit from more threads and larger files - don't benefit from more threads. Too many threads can slow operations, - especially with large files, due to contention over the Python GIL. + ***DEPRECATED*** Sets `worker_type` to THREAD and `max_workers` to the + number specified. If `worker_type` or `max_workers` are set explicitly, + this parameter should be set to None. Please use `worker_type` and + `max_workers` instead of this parameter. :type deadline: int :param deadline: @@ -92,6 +135,40 @@ def upload_many( If skip_if_exists is True, 412 Precondition Failed responses are considered part of normal operation and are not raised as an exception. + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + PROCESS workers do not support writing to file handlers. Please refer + to files by filename only when using PROCESS workers. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. :rtype: list @@ -103,21 +180,37 @@ def upload_many( if upload_kwargs is None: upload_kwargs = {} if skip_if_exists: + upload_kwargs = upload_kwargs.copy() upload_kwargs["if_generation_match"] = 0 - with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) + + with pool_class(max_workers=max_workers) as executor: futures = [] for path_or_file, blob in file_blob_pairs: - method = ( - blob.upload_from_filename - if isinstance(path_or_file, str) - else blob.upload_from_file + # File objects are only supported by the THREAD worker because they can't + # be pickled. + if needs_pickling and not isinstance(path_or_file, str): + raise ValueError( + "Passing in a file object is only supported by the THREAD worker type. Please either select THREAD workers, or pass in filenames only." + ) + + futures.append( + executor.submit( + _call_method_on_maybe_pickled_blob, + _pickle_blob(blob) if needs_pickling else blob, + "upload_from_filename" + if isinstance(path_or_file, str) + else "upload_from_file", + path_or_file, + **upload_kwargs, + ) ) - futures.append(executor.submit(method, path_or_file, **upload_kwargs)) + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) + results = [] - concurrent.futures.wait( - futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED - ) for future in futures: exp = future.exception() @@ -134,12 +227,15 @@ def upload_many( return results +@_deprecate_threads_param def download_many( blob_file_pairs, download_kwargs=None, - threads=4, + threads=None, deadline=None, raise_exception=False, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, ): """Download many blobs concurrently via a worker pool. @@ -154,6 +250,9 @@ def download_many( Note that blob.download_to_filename() does not delete the destination file if the download fails. + File handlers are only supported if worker_type is set to THREAD. + If worker_type is set to PROCESS, please use filenames only. + :type download_kwargs: dict :param download_kwargs: A dictionary of keyword arguments to pass to the download method. Refer @@ -163,14 +262,10 @@ def download_many( :type threads: int :param threads: - The number of threads to use in the worker pool. This is passed to - `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer - to standard library documentation for details. - - The performance impact of this value depends on the use case, but - generally, smaller files benefit from more threads and larger files - don't benefit from more threads. Too many threads can slow operations, - especially with large files, due to contention over the Python GIL. + ***DEPRECATED*** Sets `worker_type` to THREAD and `max_workers` to the + number specified. If `worker_type` or `max_workers` are set explicitly, + this parameter should be set to None. Please use `worker_type` and + `max_workers` instead of this parameter. :type deadline: int :param deadline: @@ -187,6 +282,40 @@ def download_many( are only processed and potentially raised after all operations are complete in success or failure. + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + PROCESS workers do not support writing to file handlers. Please refer + to files by filename only when using PROCESS workers. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. :rtype: list @@ -198,29 +327,48 @@ def download_many( if download_kwargs is None: download_kwargs = {} - with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: + + pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) + + with pool_class(max_workers=max_workers) as executor: futures = [] for blob, path_or_file in blob_file_pairs: - method = ( - blob.download_to_filename - if isinstance(path_or_file, str) - else blob.download_to_file + # File objects are only supported by the THREAD worker because they can't + # be pickled. + if needs_pickling and not isinstance(path_or_file, str): + raise ValueError( + "Passing in a file object is only supported by the THREAD worker type. Please either select THREAD workers, or pass in filenames only." + ) + + futures.append( + executor.submit( + _call_method_on_maybe_pickled_blob, + _pickle_blob(blob) if needs_pickling else blob, + "download_to_filename" + if isinstance(path_or_file, str) + else "download_to_file", + path_or_file, + **download_kwargs, + ) ) - futures.append(executor.submit(method, path_or_file, **download_kwargs)) + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) + results = [] - concurrent.futures.wait( - futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED - ) for future in futures: + # If raise_exception is False, don't call future.result() if not raise_exception: exp = future.exception() if exp: results.append(exp) continue + # Get the real result. If there was an exception, this will raise it. results.append(future.result()) return results +@_deprecate_threads_param def upload_many_from_filenames( bucket, filenames, @@ -229,9 +377,11 @@ def upload_many_from_filenames( skip_if_exists=False, blob_constructor_kwargs=None, upload_kwargs=None, - threads=4, + threads=None, deadline=None, raise_exception=False, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, ): """Upload many files concurrently by their filenames. @@ -309,14 +459,10 @@ def upload_many_from_filenames( :type threads: int :param threads: - The number of threads to use in the worker pool. This is passed to - `concurrent.futures.ThreadPoolExecutor` as the `max_worker`; refer - to standard library documentation for details. - - The performance impact of this value depends on the use case, but - generally, smaller files benefit from more threads and larger files - don't benefit from more threads. Too many threads can slow operations, - especially with large files, due to contention over the Python GIL. + ***DEPRECATED*** Sets `worker_type` to THREAD and `max_workers` to the + number specified. If `worker_type` or `max_workers` are set explicitly, + this parameter should be set to None. Please use `worker_type` and + `max_workers` instead of this parameter. :type deadline: int :param deadline: @@ -336,6 +482,37 @@ def upload_many_from_filenames( If skip_if_exists is True, 412 Precondition Failed responses are considered part of normal operation and are not raised as an exception. + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. :rtype: list @@ -359,22 +536,26 @@ def upload_many_from_filenames( file_blob_pairs, skip_if_exists=skip_if_exists, upload_kwargs=upload_kwargs, - threads=threads, deadline=deadline, raise_exception=raise_exception, + worker_type=worker_type, + max_workers=max_workers, ) +@_deprecate_threads_param def download_many_to_path( bucket, blob_names, destination_directory="", blob_name_prefix="", download_kwargs=None, - threads=4, + threads=None, deadline=None, create_directories=True, raise_exception=False, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, ): """Download many files concurrently by their blob names. @@ -442,14 +623,10 @@ def download_many_to_path( :type threads: int :param threads: - The number of threads to use in the worker pool. This is passed to - `concurrent.futures.ThreadPoolExecutor` as the `max_worker` param; refer - to standard library documentation for details. - - The performance impact of this value depends on the use case, but - generally, smaller files benefit from more threads and larger files - don't benefit from more threads. Too many threads can slow operations, - especially with large files, due to contention over the Python GIL. + ***DEPRECATED*** Sets `worker_type` to THREAD and `max_workers` to the + number specified. If `worker_type` or `max_workers` are set explicitly, + this parameter should be set to None. Please use `worker_type` and + `max_workers` instead of this parameter. :type deadline: int :param deadline: @@ -474,6 +651,37 @@ def download_many_to_path( Precondition Failed responses are considered part of normal operation and are not raised as an exception. + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. :rtype: list @@ -495,7 +703,237 @@ def download_many_to_path( return download_many( blob_file_pairs, download_kwargs=download_kwargs, - threads=threads, deadline=deadline, raise_exception=raise_exception, + worker_type=worker_type, + max_workers=max_workers, ) + + +def download_chunks_concurrently( + blob, + filename, + chunk_size=TM_DEFAULT_CHUNK_SIZE, + download_kwargs=None, + deadline=None, + worker_type=PROCESS, + max_workers=DEFAULT_MAX_WORKERS, +): + """Download a single file in chunks, concurrently. + + This function is a PREVIEW FEATURE: the API may change in a future version. + + In some environments, using this feature with mutiple processes will result + in faster downloads of large files. + + Using this feature with multiple threads is unlikely to improve download + performance under normal circumstances due to Python interpreter threading + behavior. The default is therefore to use processes instead of threads. + + Checksumming (md5 or crc32c) is not supported for chunked operations. Any + `checksum` parameter passed in to download_kwargs will be ignored. + + :type bucket: 'google.cloud.storage.bucket.Bucket' + :param bucket: + The bucket which contains the blobs to be downloaded + + :type blob: `google.cloud.storage.Blob` + :param blob: + The blob to be downloaded. + + :type filename: str + :param filename: + The destination filename or path. + + :type download_kwargs: dict + :param download_kwargs: + A dictionary of keyword arguments to pass to the download method. Refer + to the documentation for blob.download_to_file() or + blob.download_to_filename() for more information. The dict is directly + passed into the download methods and is not validated by this function. + + Keyword arguments "start" and "end" which are not supported and will + cause a ValueError if present. + + :type deadline: int + :param deadline: + The number of seconds to wait for all threads to resolve. If the + deadline is reached, all threads will be terminated regardless of their + progress and concurrent.futures.TimeoutError will be raised. This can be + left as the default of None (no deadline) for most use cases. + + :type worker_type: str + :param worker_type: + The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS + or google.cloud.storage.transfer_manager.THREAD. + + Although the exact performance impact depends on the use case, in most + situations the PROCESS worker type will use more system resources (both + memory and CPU) and result in faster operations than THREAD workers. + + Because the subprocesses of the PROCESS worker type can't access memory + from the main process, Client objects have to be serialized and then + recreated in each subprocess. The serialization of the Client object + for use in subprocesses is an approximation and may not capture every + detail of the Client object, especially if the Client was modified after + its initial creation or if `Client._http` was modified in any way. + + THREAD worker types are observed to be relatively efficient for + operations with many small files, but not for operations with large + files. PROCESS workers are recommended for large file operations. + + :type max_workers: int + :param max_workers: + The maximum number of workers to create to handle the workload. + + With PROCESS workers, a larger number of workers will consume more + system resources (memory and CPU) at once. + + How many workers is optimal depends heavily on the specific use case, + and the default is a conservative number that should work okay in most + cases without consuming excessive resources. + + :raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded. + """ + + if download_kwargs is None: + download_kwargs = {} + if "start" in download_kwargs or "end" in download_kwargs: + raise ValueError( + "Download arguments 'start' and 'end' are not supported by download_chunks_concurrently." + ) + + # We must know the size and the generation of the blob. + if not blob.size or not blob.generation: + blob.reload() + + pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type) + # Pickle the blob ahead of time (just once, not once per chunk) if needed. + maybe_pickled_blob = _pickle_blob(blob) if needs_pickling else blob + + futures = [] + + # Create and/or truncate the destination file to prepare for sparse writing. + with open(filename, "wb") as _: + pass + + with pool_class(max_workers=max_workers) as executor: + cursor = 0 + end = blob.size + while cursor < end: + start = cursor + cursor = min(cursor + chunk_size, end) + futures.append( + executor.submit( + _download_and_write_chunk_in_place, + maybe_pickled_blob, + filename, + start=start, + end=cursor - 1, + download_kwargs=download_kwargs, + ) + ) + + concurrent.futures.wait( + futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED + ) + + # Raise any exceptions. Successful results can be ignored. + for future in futures: + future.result() + return None + + +def _download_and_write_chunk_in_place( + maybe_pickled_blob, filename, start, end, download_kwargs +): + if isinstance(maybe_pickled_blob, Blob): + blob = maybe_pickled_blob + else: + blob = pickle.loads(maybe_pickled_blob) + with open( + filename, "rb+" + ) as f: # Open in mixed read/write mode to avoid truncating or appending + f.seek(start) + return blob.download_to_file(f, start=start, end=end, **download_kwargs) + + +def _call_method_on_maybe_pickled_blob( + maybe_pickled_blob, method_name, *args, **kwargs +): + """Helper function that runs inside a thread or subprocess. + + `maybe_pickled_blob` is either a blob (for threads) or a specially pickled + blob (for processes) because the default pickling mangles clients which are + attached to blobs.""" + + if isinstance(maybe_pickled_blob, Blob): + blob = maybe_pickled_blob + else: + blob = pickle.loads(maybe_pickled_blob) + return getattr(blob, method_name)(*args, **kwargs) + + +def _reduce_client(cl): + """Replicate a Client by constructing a new one with the same params.""" + + client_object_id = id(cl) + project = cl.project + credentials = cl._credentials + _http = None # Can't carry this over + client_info = cl._initial_client_info + client_options = cl._initial_client_options + + return _LazyClient, ( + client_object_id, + project, + credentials, + _http, + client_info, + client_options, + ) + + +def _pickle_blob(blob): + """Pickle a Blob (and its Bucket and Client) and return a bytestring.""" + + # We need a custom pickler to process Client objects, which are attached to + # Buckets (and therefore to Blobs in turn). Unfortunately, the Python + # multiprocessing library doesn't seem to have a good way to use a custom + # pickler, and using copyreg will mutate global state and affect code + # outside of the client library. Instead, we'll pre-pickle the object and + # pass the bytestring in. + f = io.BytesIO() + p = pickle.Pickler(f) + p.dispatch_table = copyreg.dispatch_table.copy() + p.dispatch_table[Client] = _reduce_client + p.dump(blob) + return f.getvalue() + + +def _get_pool_class_and_requirements(worker_type): + """Returns the pool class, and whether the pool requires pickled Blobs.""" + + if worker_type == PROCESS: + # Use processes. Pickle blobs with custom logic to handle the client. + return (concurrent.futures.ProcessPoolExecutor, True) + elif worker_type == THREAD: + # Use threads. Pass blobs through unpickled. + return (concurrent.futures.ThreadPoolExecutor, False) + else: + raise ValueError( + "The worker_type must be google.cloud.storage.transfer_manager.PROCESS or google.cloud.storage.transfer_manager.THREAD" + ) + + +class _LazyClient: + """An object that will transform into either a cached or a new Client""" + + def __new__(cls, id, *args, **kwargs): + cached_client = _cached_clients.get(id) + if cached_client: + return cached_client + else: + cached_client = Client(*args, **kwargs) + _cached_clients[id] = cached_client + return cached_client diff --git a/google/cloud/storage/version.py b/google/cloud/storage/version.py index d962613e0..0a9aecb37 100644 --- a/google/cloud/storage/version.py +++ b/google/cloud/storage/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "2.7.0" +__version__ = "2.8.0" diff --git a/noxfile.py b/noxfile.py index 336520412..3b67a5712 100644 --- a/noxfile.py +++ b/noxfile.py @@ -29,7 +29,7 @@ DEFAULT_PYTHON_VERSION = "3.8" SYSTEM_TEST_PYTHON_VERSIONS = ["3.8"] -UNIT_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9", "3.10"] +UNIT_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9", "3.10", "3.11"] CONFORMANCE_TEST_PYTHON_VERSIONS = ["3.8"] _DEFAULT_STORAGE_HOST = "https://storage.googleapis.com" @@ -137,9 +137,8 @@ def system(session): session.skip("System tests were not found") # Use pre-release gRPC for system tests. - # TODO: Revert #845 once grpc issue fix is released. - # Pending grpc/grpc#30642 and grpc/grpc#30651. - session.install("--pre", "grpcio!=1.49.0rc1") + # TODO: Remove ban of 1.52.0rc1 once grpc/grpc#31885 is resolved. + session.install("--pre", "grpcio!=1.52.0rc1") # Install all test dependencies, then install this package into the # virtualenv's dist-packages. diff --git a/owlbot.py b/owlbot.py index 8d0b89d14..50a787f34 100644 --- a/owlbot.py +++ b/owlbot.py @@ -26,7 +26,6 @@ templated_files = common.py_library( cov_level=100, split_system_tests=True, - unit_test_python_versions=["3.7", "3.8", "3.9", "3.10"], system_test_external_dependencies=[ "google-cloud-iam", "google-cloud-pubsub < 2.0.0", @@ -46,6 +45,7 @@ "docs/multiprocessing.rst", "noxfile.py", "CONTRIBUTING.rst", + "README.rst", ".kokoro/samples/python3.6", # remove python 3.6 support ".github/workflows", # exclude gh actions as credentials are needed for tests ".github/release-please.yml", # special support for a python2 branch in this repo diff --git a/samples/snippets/encryption_test.py b/samples/snippets/encryption_test.py index 536c5d334..5a5eb7b2d 100644 --- a/samples/snippets/encryption_test.py +++ b/samples/snippets/encryption_test.py @@ -47,15 +47,18 @@ def test_generate_encryption_key(capsys): def test_upload_encrypted_blob(): + blob_name = f"test_upload_encrypted_{uuid.uuid4().hex}" with tempfile.NamedTemporaryFile() as source_file: source_file.write(b"test") storage_upload_encrypted_file.upload_encrypted_blob( BUCKET, source_file.name, - "test_encrypted_upload_blob", + blob_name, TEST_ENCRYPTION_KEY, ) + bucket = storage.Client().bucket(BUCKET) + bucket.delete_blob(blob_name) @pytest.fixture(scope="module") diff --git a/samples/snippets/noxfile.py b/samples/snippets/noxfile.py index f5c32b227..7c8a63994 100644 --- a/samples/snippets/noxfile.py +++ b/samples/snippets/noxfile.py @@ -89,7 +89,7 @@ def get_pytest_env_vars() -> Dict[str, str]: # DO NOT EDIT - automatically generated. # All versions used to test samples. -ALL_VERSIONS = ["3.7", "3.8", "3.9", "3.10"] +ALL_VERSIONS = ["3.7", "3.8", "3.9", "3.10", "3.11"] # Any default versions that should be ignored. IGNORED_VERSIONS = TEST_CONFIG["ignored_versions"] diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt index 4e8a7389f..2e805e1f8 100644 --- a/samples/snippets/requirements-test.txt +++ b/samples/snippets/requirements-test.txt @@ -1,3 +1,3 @@ -pytest==7.2.0 -mock==4.0.3 +pytest==7.2.2 +mock==5.0.1 backoff==2.2.1 \ No newline at end of file diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index d5554b4d9..1e06ff2d2 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,4 +1,4 @@ -google-cloud-pubsub==2.13.11 -google-cloud-storage==2.6.0 +google-cloud-pubsub==2.15.2 +google-cloud-storage==2.7.0 pandas===1.3.5; python_version == '3.7' -pandas==1.5.2; python_version >= '3.8' +pandas==1.5.3; python_version >= '3.8' diff --git a/samples/snippets/snippets_test.py b/samples/snippets/snippets_test.py index 4ad0dc1a0..57751be60 100644 --- a/samples/snippets/snippets_test.py +++ b/samples/snippets/snippets_test.py @@ -235,14 +235,16 @@ def test_upload_blob_from_stream(test_bucket, capsys): def test_upload_blob_with_kms(test_bucket): + blob_name = f"test_upload_with_kms_{uuid.uuid4().hex}" with tempfile.NamedTemporaryFile() as source_file: source_file.write(b"test") storage_upload_with_kms_key.upload_blob_with_kms( - test_bucket.name, source_file.name, "test_upload_blob_encrypted", KMS_KEY + test_bucket.name, source_file.name, blob_name, KMS_KEY, ) bucket = storage.Client().bucket(test_bucket.name) - kms_blob = bucket.get_blob("test_upload_blob_encrypted") + kms_blob = bucket.get_blob(blob_name) assert kms_blob.kms_key_name.startswith(KMS_KEY) + test_bucket.delete_blob(blob_name) def test_async_upload(bucket, capsys): @@ -390,7 +392,7 @@ def test_move_blob(test_bucket_create, test_blob): print(f"test_move_blob not found in bucket {test_bucket_create.name}") storage_move_file.move_blob( - bucket.name, test_blob.name, test_bucket_create.name, "test_move_blob" + bucket.name, test_blob.name, test_bucket_create.name, "test_move_blob", ) assert test_bucket_create.get_blob("test_move_blob") is not None @@ -406,7 +408,7 @@ def test_copy_blob(test_blob): pass storage_copy_file.copy_blob( - bucket.name, test_blob.name, bucket.name, "test_copy_blob" + bucket.name, test_blob.name, bucket.name, "test_copy_blob", ) assert bucket.get_blob("test_copy_blob") is not None @@ -545,7 +547,7 @@ def test_define_bucket_website_configuration(test_bucket): def test_object_get_kms_key(test_bucket): with tempfile.NamedTemporaryFile() as source_file: storage_upload_with_kms_key.upload_blob_with_kms( - test_bucket.name, source_file.name, "test_upload_blob_encrypted", KMS_KEY + test_bucket.name, source_file.name, "test_upload_blob_encrypted", KMS_KEY, ) kms_key = storage_object_get_kms_key.object_get_kms_key( test_bucket.name, "test_upload_blob_encrypted" @@ -562,7 +564,7 @@ def test_storage_compose_file(test_bucket): with tempfile.NamedTemporaryFile() as dest_file: destination = storage_compose_file.compose_file( - test_bucket.name, source_files[0], source_files[1], dest_file.name + test_bucket.name, source_files[0], source_files[1], dest_file.name, ) composed = destination.download_as_string() @@ -602,7 +604,7 @@ def test_change_default_storage_class(test_bucket, capsys): def test_change_file_storage_class(test_blob, capsys): blob = storage_change_file_storage_class.change_file_storage_class( - test_blob.bucket.name, test_blob.name + test_blob.bucket.name, test_blob.name, ) out, _ = capsys.readouterr() assert f"Blob {blob.name} in bucket {blob.bucket.name}" in out diff --git a/samples/snippets/storage_change_file_storage_class.py b/samples/snippets/storage_change_file_storage_class.py index d5dda56a7..a976ac8a4 100644 --- a/samples/snippets/storage_change_file_storage_class.py +++ b/samples/snippets/storage_change_file_storage_class.py @@ -27,9 +27,17 @@ def change_file_storage_class(bucket_name, blob_name): storage_client = storage.Client() - bucket = storage_client.get_bucket(bucket_name) - blob = bucket.get_blob(blob_name) - blob.update_storage_class("NEARLINE") + bucket = storage_client.bucket(bucket_name) + blob = bucket.blob(blob_name) + generation_match_precondition = None + + # Optional: set a generation-match precondition to avoid potential race + # conditions and data corruptions. The request is aborted if the + # object's generation number does not match your precondition. + blob.reload() # Fetch blob metadata to use in generation_match_precondition. + generation_match_precondition = blob.generation + + blob.update_storage_class("NEARLINE", if_generation_match=generation_match_precondition) print( "Blob {} in bucket {} had its storage class set to {}".format( diff --git a/samples/snippets/storage_compose_file.py b/samples/snippets/storage_compose_file.py index 2c1443f22..e67391272 100644 --- a/samples/snippets/storage_compose_file.py +++ b/samples/snippets/storage_compose_file.py @@ -32,9 +32,19 @@ def compose_file(bucket_name, first_blob_name, second_blob_name, destination_blo destination = bucket.blob(destination_blob_name) destination.content_type = "text/plain" - # sources is a list of Blob instances, up to the max of 32 instances per request - sources = [bucket.get_blob(first_blob_name), bucket.get_blob(second_blob_name)] - destination.compose(sources) + # Note sources is a list of Blob instances, up to the max of 32 instances per request + sources = [bucket.blob(first_blob_name), bucket.blob(second_blob_name)] + + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request to compose is aborted if the object's + # generation number does not match your precondition. For a destination + # object that does not yet exist, set the if_generation_match precondition to 0. + # If the destination object already exists in your bucket, set instead a + # generation-match precondition using its generation number. + # There is also an `if_source_generation_match` parameter, which is not used in this example. + destination_generation_match_precondition = 0 + + destination.compose(sources, if_generation_match=destination_generation_match_precondition) print( "New composite object {} in the bucket {} was created by combining {} and {}".format( diff --git a/samples/snippets/storage_copy_file.py b/samples/snippets/storage_copy_file.py index 5d36aa94b..b802de28b 100644 --- a/samples/snippets/storage_copy_file.py +++ b/samples/snippets/storage_copy_file.py @@ -21,7 +21,7 @@ def copy_blob( - bucket_name, blob_name, destination_bucket_name, destination_blob_name + bucket_name, blob_name, destination_bucket_name, destination_blob_name, ): """Copies a blob from one bucket to another with a new name.""" # bucket_name = "your-bucket-name" @@ -35,8 +35,17 @@ def copy_blob( source_blob = source_bucket.blob(blob_name) destination_bucket = storage_client.bucket(destination_bucket_name) + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request to copy is aborted if the object's + # generation number does not match your precondition. For a destination + # object that does not yet exist, set the if_generation_match precondition to 0. + # If the destination object already exists in your bucket, set instead a + # generation-match precondition using its generation number. + # There is also an `if_source_generation_match` parameter, which is not used in this example. + destination_generation_match_precondition = 0 + blob_copy = source_bucket.copy_blob( - source_blob, destination_bucket, destination_blob_name + source_blob, destination_bucket, destination_blob_name, if_generation_match=destination_generation_match_precondition, ) print( diff --git a/samples/snippets/storage_copy_file_archived_generation.py b/samples/snippets/storage_copy_file_archived_generation.py index 988ebcbeb..419d8e5a3 100644 --- a/samples/snippets/storage_copy_file_archived_generation.py +++ b/samples/snippets/storage_copy_file_archived_generation.py @@ -36,13 +36,22 @@ def copy_file_archived_generation( source_blob = source_bucket.blob(blob_name) destination_bucket = storage_client.bucket(destination_bucket_name) + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request to copy is aborted if the object's + # generation number does not match your precondition. For a destination + # object that does not yet exist, set the if_generation_match precondition to 0. + # If the destination object already exists in your bucket, set instead a + # generation-match precondition using its generation number. + destination_generation_match_precondition = 0 + + # source_generation selects a specific revision of the source object, as opposed to the latest version. blob_copy = source_bucket.copy_blob( - source_blob, destination_bucket, destination_blob_name, source_generation=generation + source_blob, destination_bucket, destination_blob_name, source_generation=generation, if_generation_match=destination_generation_match_precondition ) print( "Generation {} of the blob {} in bucket {} copied to blob {} in bucket {}.".format( - source_blob.generation, + generation, source_blob.name, source_bucket.name, blob_copy.name, diff --git a/samples/snippets/storage_delete_file.py b/samples/snippets/storage_delete_file.py index b2997c86b..427604145 100644 --- a/samples/snippets/storage_delete_file.py +++ b/samples/snippets/storage_delete_file.py @@ -29,7 +29,15 @@ def delete_blob(bucket_name, blob_name): bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) - blob.delete() + generation_match_precondition = None + + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request to delete is aborted if the object's + # generation number does not match your precondition. + blob.reload() # Fetch blob metadata to use in generation_match_precondition. + generation_match_precondition = blob.generation + + blob.delete(if_generation_match=generation_match_precondition) print(f"Blob {blob_name} deleted.") diff --git a/samples/snippets/storage_move_file.py b/samples/snippets/storage_move_file.py index a881a38ba..b2e5144d0 100644 --- a/samples/snippets/storage_move_file.py +++ b/samples/snippets/storage_move_file.py @@ -20,7 +20,7 @@ from google.cloud import storage -def move_blob(bucket_name, blob_name, destination_bucket_name, destination_blob_name): +def move_blob(bucket_name, blob_name, destination_bucket_name, destination_blob_name,): """Moves a blob from one bucket to another with a new name.""" # The ID of your GCS bucket # bucket_name = "your-bucket-name" @@ -37,8 +37,17 @@ def move_blob(bucket_name, blob_name, destination_bucket_name, destination_blob_ source_blob = source_bucket.blob(blob_name) destination_bucket = storage_client.bucket(destination_bucket_name) + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request is aborted if the object's + # generation number does not match your precondition. For a destination + # object that does not yet exist, set the if_generation_match precondition to 0. + # If the destination object already exists in your bucket, set instead a + # generation-match precondition using its generation number. + # There is also an `if_source_generation_match` parameter, which is not used in this example. + destination_generation_match_precondition = 0 + blob_copy = source_bucket.copy_blob( - source_blob, destination_bucket, destination_blob_name + source_blob, destination_bucket, destination_blob_name, if_generation_match=destination_generation_match_precondition, ) source_bucket.delete_blob(blob_name) diff --git a/samples/snippets/storage_object_csek_to_cmek.py b/samples/snippets/storage_object_csek_to_cmek.py index 9d4d710bf..9a915f08d 100644 --- a/samples/snippets/storage_object_csek_to_cmek.py +++ b/samples/snippets/storage_object_csek_to_cmek.py @@ -33,12 +33,22 @@ def object_csek_to_cmek(bucket_name, blob_name, encryption_key, kms_key_name): current_encryption_key = base64.b64decode(encryption_key) source_blob = bucket.blob(blob_name, encryption_key=current_encryption_key) - destination_blob = bucket.blob(blob_name, kms_key_name=kms_key_name) - token, rewritten, total = destination_blob.rewrite(source_blob) + generation_match_precondition = None + token = None + + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request to rewrite is aborted if the object's + # generation number does not match your precondition. + source_blob.reload() # Fetch blob metadata to use in generation_match_precondition. + generation_match_precondition = source_blob.generation - while token is not None: - token, rewritten, total = destination_blob.rewrite(source_blob, token=token) + while True: + token, bytes_rewritten, total_bytes = destination_blob.rewrite( + source_blob, token=token, if_generation_match=generation_match_precondition + ) + if token is None: + break print( "Blob {} in bucket {} is now managed by the KMS key {} instead of a customer-supplied encryption key".format( diff --git a/samples/snippets/storage_release_event_based_hold.py b/samples/snippets/storage_release_event_based_hold.py index 1db637cd9..6b4a2ccb5 100644 --- a/samples/snippets/storage_release_event_based_hold.py +++ b/samples/snippets/storage_release_event_based_hold.py @@ -29,9 +29,16 @@ def release_event_based_hold(bucket_name, blob_name): storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) + metageneration_match_precondition = None + + # Optional: set a metageneration-match precondition to avoid potential race + # conditions and data corruptions. The request to patch is aborted if the + # object's metageneration does not match your precondition. + blob.reload() # Fetch blob metadata to use in metageneration_match_precondition. + metageneration_match_precondition = blob.metageneration blob.event_based_hold = False - blob.patch() + blob.patch(if_metageneration_match=metageneration_match_precondition) print(f"Event based hold was released for {blob_name}") diff --git a/samples/snippets/storage_release_temporary_hold.py b/samples/snippets/storage_release_temporary_hold.py index 02a6ca96c..64c7607c1 100644 --- a/samples/snippets/storage_release_temporary_hold.py +++ b/samples/snippets/storage_release_temporary_hold.py @@ -29,9 +29,16 @@ def release_temporary_hold(bucket_name, blob_name): storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) + metageneration_match_precondition = None + + # Optional: set a metageneration-match precondition to avoid potential race + # conditions and data corruptions. The request to patch is aborted if the + # object's metageneration does not match your precondition. + blob.reload() # Fetch blob metadata to use in metageneration_match_precondition. + metageneration_match_precondition = blob.metageneration blob.temporary_hold = False - blob.patch() + blob.patch(if_metageneration_match=metageneration_match_precondition) print("Temporary hold was release for #{blob_name}") diff --git a/samples/snippets/storage_rotate_encryption_key.py b/samples/snippets/storage_rotate_encryption_key.py index 828b7d5ef..174947b84 100644 --- a/samples/snippets/storage_rotate_encryption_key.py +++ b/samples/snippets/storage_rotate_encryption_key.py @@ -42,12 +42,18 @@ def rotate_encryption_key( destination_blob = bucket.blob( blob_name, encryption_key=new_encryption_key ) - + generation_match_precondition = None token = None + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request to rewrite is aborted if the object's + # generation number does not match your precondition. + source_blob.reload() # Fetch blob metadata to use in generation_match_precondition. + generation_match_precondition = source_blob.generation + while True: token, bytes_rewritten, total_bytes = destination_blob.rewrite( - source_blob, token=token + source_blob, token=token, if_generation_match=generation_match_precondition ) if token is None: break diff --git a/samples/snippets/storage_set_event_based_hold.py b/samples/snippets/storage_set_event_based_hold.py index e04ed7552..76f7fd7ee 100644 --- a/samples/snippets/storage_set_event_based_hold.py +++ b/samples/snippets/storage_set_event_based_hold.py @@ -28,9 +28,16 @@ def set_event_based_hold(bucket_name, blob_name): storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) + metageneration_match_precondition = None + + # Optional: set a metageneration-match precondition to avoid potential race + # conditions and data corruptions. The request to patch is aborted if the + # object's metageneration does not match your precondition. + blob.reload() # Fetch blob metadata to use in metageneration_match_precondition. + metageneration_match_precondition = blob.metageneration blob.event_based_hold = True - blob.patch() + blob.patch(if_metageneration_match=metageneration_match_precondition) print(f"Event based hold was set for {blob_name}") diff --git a/samples/snippets/storage_set_metadata.py b/samples/snippets/storage_set_metadata.py index 90b6838c0..6a4a9fb9e 100644 --- a/samples/snippets/storage_set_metadata.py +++ b/samples/snippets/storage_set_metadata.py @@ -28,9 +28,16 @@ def set_blob_metadata(bucket_name, blob_name): storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.get_blob(blob_name) + metageneration_match_precondition = None + + # Optional: set a metageneration-match precondition to avoid potential race + # conditions and data corruptions. The request to patch is aborted if the + # object's metageneration does not match your precondition. + metageneration_match_precondition = blob.metageneration + metadata = {'color': 'Red', 'name': 'Test'} blob.metadata = metadata - blob.patch() + blob.patch(if_metageneration_match=metageneration_match_precondition) print(f"The metadata for the blob {blob.name} is {blob.metadata}") diff --git a/samples/snippets/storage_set_temporary_hold.py b/samples/snippets/storage_set_temporary_hold.py index edeb3c578..a91521bcc 100644 --- a/samples/snippets/storage_set_temporary_hold.py +++ b/samples/snippets/storage_set_temporary_hold.py @@ -28,9 +28,16 @@ def set_temporary_hold(bucket_name, blob_name): storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) + metageneration_match_precondition = None + + # Optional: set a metageneration-match precondition to avoid potential race + # conditions and data corruptions. The request to patch is aborted if the + # object's metageneration does not match your precondition. + blob.reload() # Fetch blob metadata to use in metageneration_match_precondition. + metageneration_match_precondition = blob.metageneration blob.temporary_hold = True - blob.patch() + blob.patch(if_metageneration_match=metageneration_match_precondition) print("Temporary hold was set for #{blob_name}") diff --git a/samples/snippets/storage_upload_encrypted_file.py b/samples/snippets/storage_upload_encrypted_file.py index 5f4987238..08f58154e 100644 --- a/samples/snippets/storage_upload_encrypted_file.py +++ b/samples/snippets/storage_upload_encrypted_file.py @@ -36,6 +36,10 @@ def upload_encrypted_blob( The file will be encrypted by Google Cloud Storage and only retrievable using the provided encryption key. """ + # bucket_name = "your-bucket-name" + # source_file_name = "local/path/to/file" + # destination_blob_name = "storage-object-name" + # base64_encryption_key = "TIbv/fjexq+VmtXzAlc63J4z5kFmWJ6NdAPQulQBT7g=" storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) @@ -48,7 +52,15 @@ def upload_encrypted_blob( destination_blob_name, encryption_key=encryption_key ) - blob.upload_from_filename(source_file_name) + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request to upload is aborted if the object's + # generation number does not match your precondition. For a destination + # object that does not yet exist, set the if_generation_match precondition to 0. + # If the destination object already exists in your bucket, set instead a + # generation-match precondition using its generation number. + generation_match_precondition = 0 + + blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition) print( f"File {source_file_name} uploaded to {destination_blob_name}." diff --git a/samples/snippets/storage_upload_file.py b/samples/snippets/storage_upload_file.py index 8e7d98630..1e7ceda5e 100644 --- a/samples/snippets/storage_upload_file.py +++ b/samples/snippets/storage_upload_file.py @@ -33,7 +33,15 @@ def upload_blob(bucket_name, source_file_name, destination_blob_name): bucket = storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name) - blob.upload_from_filename(source_file_name) + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request to upload is aborted if the object's + # generation number does not match your precondition. For a destination + # object that does not yet exist, set the if_generation_match precondition to 0. + # If the destination object already exists in your bucket, set instead a + # generation-match precondition using its generation number. + generation_match_precondition = 0 + + blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition) print( f"File {source_file_name} uploaded to {destination_blob_name}." diff --git a/samples/snippets/storage_upload_with_kms_key.py b/samples/snippets/storage_upload_with_kms_key.py index e83c10aea..6e8fe0394 100644 --- a/samples/snippets/storage_upload_with_kms_key.py +++ b/samples/snippets/storage_upload_with_kms_key.py @@ -21,7 +21,7 @@ def upload_blob_with_kms( - bucket_name, source_file_name, destination_blob_name, kms_key_name + bucket_name, source_file_name, destination_blob_name, kms_key_name, ): """Uploads a file to the bucket, encrypting it with the given KMS key.""" # bucket_name = "your-bucket-name" @@ -32,7 +32,16 @@ def upload_blob_with_kms( storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(destination_blob_name, kms_key_name=kms_key_name) - blob.upload_from_filename(source_file_name) + + # Optional: set a generation-match precondition to avoid potential race conditions + # and data corruptions. The request to upload is aborted if the object's + # generation number does not match your precondition. For a destination + # object that does not yet exist, set the if_generation_match precondition to 0. + # If the destination object already exists in your bucket, set instead a + # generation-match precondition using its generation number. + generation_match_precondition = 0 + + blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition) print( "File {} uploaded to {} with encryption key {}.".format( diff --git a/setup.py b/setup.py index 8686745f7..e2b5cc7a4 100644 --- a/setup.py +++ b/setup.py @@ -81,6 +81,7 @@ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", "Operating System :: OS Independent", "Topic :: Internet", ], diff --git a/tests/perf/README.md b/tests/perf/README.md index d530b12d9..08e778f51 100644 --- a/tests/perf/README.md +++ b/tests/perf/README.md @@ -18,12 +18,20 @@ $ python3 benchmarking.py --num_samples 10000 --max_size 16384 | Parameter | Description | Possible values | Default | | --------- | ----------- | --------------- |:-------:| -| --min_size | minimum object size in bytes | any positive integer | `5120` (5 KiB) | -| --max_size | maximum object size in bytes | any positive integer | `2147483648` (2 GiB) | -| --num_samples | number of W1R3 iterations | any positive integer | `1000` | -| --r | bucket region for benchmarks | any GCS region | `US` | -| --p | number of processes (multiprocessing enabled) | any positive integer | 16 (recommend not to exceed 16) | -| --o | file to output results to | any file path | `benchmarking.csv` | +| --project | GCP project identifier | a project id| * | +| --api | API to use | only JSON is currently supported in python benchmarking | `JSON` | +| --output_type | output results as csv records or cloud monitoring | `csv`, `cloud-monitoring` | `cloud-monitoring` | +| --object_size | object size in bytes; can be a range min..max | string | `1048576` (1 MiB) | +| --range_read_size | size of the range to read in bytes | any positive integer
<=0 reads the full object | `0` | +| --minimum_read_offset | minimum offset for the start of the range to be read in bytes | any integer >0 | `0` | +| --maximum_read_offset | maximum offset for the start of the range to be read in bytes | any integer >0 | `0` | +| --samples | number of W1R3 iterations | any positive integer | `8000` | +| --bucket | storage bucket name | a bucket name | `pybench` | +| --bucket_region | bucket region for benchmarks | any GCS region | `US-WEST1` | +| --workers | number of processes (multiprocessing enabled) | any positive integer | 16 (recommend not to exceed 16) | +| --test_type | test type to run benchmarking | `w1r3`, `range` | `w1r3` | +| --output_file | file to output results to | any file path | `output_bench.csv` | +| --tmp_dir | temp directory path on file system | any file path | `tm-perf-metrics` | ## Workload definition and CSV headers diff --git a/tests/perf/_perf_utils.py b/tests/perf/_perf_utils.py new file mode 100644 index 000000000..d1e625f8e --- /dev/null +++ b/tests/perf/_perf_utils.py @@ -0,0 +1,216 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Performance benchmarking helper methods. This is not an officially supported Google product.""" + +import csv +import logging +import os +import random +import shutil +import time +import uuid + +from google.cloud import storage + + +##### DEFAULTS & CONSTANTS ##### +HEADER = [ + "Op", + "ObjectSize", + "AppBufferSize", + "LibBufferSize", + "Crc32cEnabled", + "MD5Enabled", + "ApiName", + "ElapsedTimeUs", + "CpuTimeUs", + "Status", +] +CHECKSUM = ["md5", "crc32c", None] +TIMESTAMP = time.strftime("%Y%m%d-%H%M%S") +DEFAULT_API = "JSON" +DEFAULT_BUCKET_NAME = f"pybench{TIMESTAMP}" +DEFAULT_BUCKET_REGION = "US-WEST1" +DEFAULT_OBJECT_RANGE_SIZE_BYTES = "1048576" # 1 MiB +DEFAULT_NUM_SAMPLES = 8000 +DEFAULT_NUM_PROCESSES = 16 +DEFAULT_LIB_BUFFER_SIZE = 104857600 # 100MB +DEFAULT_CHUNKSIZE = 104857600 # 100 MB https://github.com/googleapis/python-storage/blob/main/google/cloud/storage/blob.py#L139 +NOT_SUPPORTED = -1 +DEFAULT_BASE_DIR = "tm-perf-metrics" +DEFAULT_OUTPUT_FILE = f"output_bench{TIMESTAMP}.csv" +DEFAULT_CREATE_SUBDIR_PROBABILITY = 0.1 +SSB_SIZE_THRESHOLD_BYTES = 1048576 + + +##### UTILITY METHODS ##### + + +# Returns a boolean value with the provided probability. +def weighted_random_boolean(create_subdir_probability): + return random.uniform(0.0, 1.0) <= create_subdir_probability + + +# Creates a random file with the given file name, path and size. +def generate_random_file(file_name, file_path, size): + with open(os.path.join(file_path, file_name), "wb") as file_obj: + file_obj.write(os.urandom(size)) + + +# Creates a random directory structure consisting of subdirectories and random files. +# Returns an array of all the generated paths and total size in bytes of all generated files. +def generate_random_directory( + max_objects, + min_file_size, + max_file_size, + base_dir, + create_subdir_probability=DEFAULT_CREATE_SUBDIR_PROBABILITY, +): + directory_info = { + "paths": [], + "total_size_in_bytes": 0, + } + + file_path = base_dir + os.makedirs(file_path, exist_ok=True) + for i in range(max_objects): + if weighted_random_boolean(create_subdir_probability): + file_path = f"{file_path}/{uuid.uuid4().hex}" + os.makedirs(file_path, exist_ok=True) + directory_info["paths"].append(file_path) + else: + file_name = uuid.uuid4().hex + rand_size = random.randint(min_file_size, max_file_size) + generate_random_file(file_name, file_path, rand_size) + directory_info["total_size_in_bytes"] += rand_size + directory_info["paths"].append(os.path.join(file_path, file_name)) + + return directory_info + + +def results_to_csv(res): + results = [] + for metric in HEADER: + results.append(res.get(metric, -1)) + return results + + +def convert_to_csv(filename, results, workers): + with open(filename, "w") as file: + writer = csv.writer(file) + writer.writerow(HEADER) + # Benchmarking main script uses Multiprocessing Pool.map(), + # thus results is structured as List[List[Dict[str, any]]]. + for result in results: + for row in result: + writer.writerow(results_to_csv(row)) + + +def convert_to_cloud_monitoring(bucket_name, results, workers): + # Benchmarking main script uses Multiprocessing Pool.map(), + # thus results is structured as List[List[Dict[str, any]]]. + for result in results: + for res in result: + range_read_size = res.get("RangeReadSize", 0) + object_size = res.get("ObjectSize") + elapsed_time_us = res.get("ElapsedTimeUs") + status = res.get("Status").pop() # convert ["OK"] --> "OK" + + # Handle range reads and calculate throughput using range_read_size. + if range_read_size > 0: + size = range_read_size + else: + size = object_size + + # If size is greater than the defined threshold, report in MiB/s, otherwise report in KiB/s. + if size >= SSB_SIZE_THRESHOLD_BYTES: + throughput = (size / 1024 / 1024) / (elapsed_time_us / 1_000_000) + else: + throughput = (size / 1024) / (elapsed_time_us / 1_000_000) + + cloud_monitoring_output = ( + "throughput{" + + "library=python-storage," + + "api={},".format(res.get("ApiName")) + + "op={},".format(res.get("Op")) + + "workers={},".format(workers) + + "object_size={},".format(object_size) + + "transfer_offset={},".format(res.get("TransferOffset", 0)) + + "transfer_size={},".format(res.get("TransferSize", object_size)) + + "app_buffer_size={},".format(res.get("AppBufferSize")) + + "chunksize={},".format(res.get("TransferSize", object_size)) + + "crc32c_enabled={},".format(res.get("Crc32cEnabled")) + + "md5_enabled={},".format(res.get("MD5Enabled")) + + "cpu_time_us={},".format(res.get("CpuTimeUs")) + + "peer=''," + + f"bucket_name={bucket_name}," + + "retry_count=''," + + f"status={status}" + + "}" + f"{throughput}" + ) + + print(cloud_monitoring_output) + + +def cleanup_directory_tree(directory): + """Clean up directory tree on disk.""" + try: + shutil.rmtree(directory) + except Exception as e: + logging.exception(f"Caught an exception while deleting local directory\n {e}") + + +def cleanup_file(file_path): + """Clean up local file on disk.""" + try: + os.remove(file_path) + except Exception as e: + logging.exception(f"Caught an exception while deleting local file\n {e}") + + +def get_bucket_instance(bucket_name): + client = storage.Client() + bucket = client.bucket(bucket_name) + if not bucket.exists(): + client.create_bucket(bucket) + return bucket + + +def cleanup_bucket(bucket): + # Delete blobs first as the bucket may contain more than 256 blobs. + try: + blobs = bucket.list_blobs() + for blob in blobs: + blob.delete() + except Exception as e: + logging.exception(f"Caught an exception while deleting blobs\n {e}") + # Delete bucket. + try: + bucket.delete(force=True) + except Exception as e: + logging.exception(f"Caught an exception while deleting bucket\n {e}") + + +def get_min_max_size(object_size): + # Object size accepts a single value in bytes or a range in bytes min..max + if object_size.find("..") < 0: + min_size = int(object_size) + max_size = int(object_size) + else: + split_sizes = object_size.split("..") + min_size = int(split_sizes[0]) + max_size = int(split_sizes[1]) + return min_size, max_size diff --git a/tests/perf/benchmarking.py b/tests/perf/benchmarking.py index 2389b00e6..537bacd12 100644 --- a/tests/perf/benchmarking.py +++ b/tests/perf/benchmarking.py @@ -12,262 +12,155 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Performance benchmarking script. This is not an officially supported Google product.""" +"""Performance benchmarking main script. This is not an officially supported Google product.""" import argparse -import csv import logging import multiprocessing -import os -import random -import time -import uuid - -from functools import partial, update_wrapper from google.cloud import storage +import _perf_utils as _pu +import profile_w1r3 as w1r3 -##### DEFAULTS & CONSTANTS ##### -HEADER = [ - "Op", - "ObjectSize", - "AppBufferSize", - "LibBufferSize", - "Crc32cEnabled", - "MD5Enabled", - "ApiName", - "ElapsedTimeUs", - "CpuTimeUs", - "Status", - "RunID", -] -CHECKSUM = ["md5", "crc32c", None] -TIMESTAMP = time.strftime("%Y%m%d-%H%M%S") -DEFAULT_API = "JSON" -DEFAULT_BUCKET_LOCATION = "US" -DEFAULT_MIN_SIZE = 5120 # 5 KiB -DEFAULT_MAX_SIZE = 2147483648 # 2 GiB -DEFAULT_NUM_SAMPLES = 1000 -DEFAULT_NUM_PROCESSES = 16 -DEFAULT_LIB_BUFFER_SIZE = 104857600 # https://github.com/googleapis/python-storage/blob/main/google/cloud/storage/blob.py#L135 -NOT_SUPPORTED = -1 - - -def log_performance(func): - """Log latency and throughput output per operation call.""" - # Holds benchmarking results for each operation - res = { - "ApiName": DEFAULT_API, - "RunID": TIMESTAMP, - "CpuTimeUs": NOT_SUPPORTED, - "AppBufferSize": NOT_SUPPORTED, - "LibBufferSize": DEFAULT_LIB_BUFFER_SIZE, - } - - try: - elapsed_time = func() - except Exception as e: - logging.exception( - f"Caught an exception while running operation {func.__name__}\n {e}" - ) - res["Status"] = ["FAIL"] - elapsed_time = NOT_SUPPORTED - else: - res["Status"] = ["OK"] - - checksum = func.keywords.get("checksum") - num = func.keywords.get("num", None) - res["ElapsedTimeUs"] = elapsed_time - res["ObjectSize"] = func.keywords.get("size") - res["Crc32cEnabled"] = checksum == "crc32c" - res["MD5Enabled"] = checksum == "md5" - res["Op"] = func.__name__ - if res["Op"] == "READ": - res["Op"] += f"[{num}]" - - return [ - res["Op"], - res["ObjectSize"], - res["AppBufferSize"], - res["LibBufferSize"], - res["Crc32cEnabled"], - res["MD5Enabled"], - res["ApiName"], - res["ElapsedTimeUs"], - res["CpuTimeUs"], - res["Status"], - res["RunID"], - ] - - -def WRITE(bucket, blob_name, checksum, size, **kwargs): - """Perform an upload and return latency.""" - blob = bucket.blob(blob_name) - file_path = f"{os.getcwd()}/{uuid.uuid4().hex}" - # Create random file locally on disk - with open(file_path, "wb") as file_obj: - file_obj.write(os.urandom(size)) - - start_time = time.monotonic_ns() - blob.upload_from_filename(file_path, checksum=checksum, if_generation_match=0) - end_time = time.monotonic_ns() - - elapsed_time = round( - (end_time - start_time) / 1000 - ) # convert nanoseconds to microseconds - - # Clean up local file - cleanup_file(file_path) - - return elapsed_time - - -def READ(bucket, blob_name, checksum, **kwargs): - """Perform a download and return latency.""" - blob = bucket.blob(blob_name) - if not blob.exists(): - raise Exception("Blob does not exist. Previous WRITE failed.") - - file_path = f"{os.getcwd()}/{blob_name}" - with open(file_path, "wb") as file_obj: - start_time = time.monotonic_ns() - blob.download_to_file(file_obj, checksum=checksum) - end_time = time.monotonic_ns() - - elapsed_time = round( - (end_time - start_time) / 1000 - ) # convert nanoseconds to microseconds - - # Clean up local file - cleanup_file(file_path) - - return elapsed_time - - -def cleanup_file(file_path): - """Clean up local file on disk.""" - try: - os.remove(file_path) - except Exception as e: - logging.exception(f"Caught an exception while deleting local file\n {e}") - - -def _wrapped_partial(func, *args, **kwargs): - """Helper method to create partial and propagate function name and doc from original function.""" - partial_func = partial(func, *args, **kwargs) - update_wrapper(partial_func, func) - return partial_func - - -def _generate_func_list(bucket_name, min_size, max_size): - """Generate Write-1-Read-3 workload.""" - # generate randmon size in bytes using a uniform distribution - size = random.randrange(min_size, max_size) - blob_name = f"{TIMESTAMP}-{uuid.uuid4().hex}" - - # generate random checksumming type: md5, crc32c or None - idx_checksum = random.choice([0, 1, 2]) - checksum = CHECKSUM[idx_checksum] - - func_list = [ - _wrapped_partial( - WRITE, - storage.Client().bucket(bucket_name), - blob_name, - size=size, - checksum=checksum, - ), - *[ - _wrapped_partial( - READ, - storage.Client().bucket(bucket_name), - blob_name, - size=size, - checksum=checksum, - num=i, - ) - for i in range(3) - ], - ] - return func_list - - -def benchmark_runner(args): - """Run benchmarking iterations.""" - results = [] - for func in _generate_func_list(args.b, args.min_size, args.max_size): - results.append(log_performance(func)) - return results +##### PROFILE BENCHMARKING TEST TYPES ##### +PROFILE_WRITE_ONE_READ_THREE = "w1r3" +PROFILE_RANGE_READ = "range" def main(args): - # Create a storage bucket to run benchmarking - client = storage.Client() - if not client.bucket(args.b).exists(): - bucket = client.create_bucket(args.b, location=args.r) - - # Launch benchmark_runner using multiprocessing - p = multiprocessing.Pool(args.p) - pool_output = p.map(benchmark_runner, [args for _ in range(args.num_samples)]) + logging.info("Start benchmarking main script") + # Create a storage bucket to run benchmarking. + if args.project is not None: + client = storage.Client(project=args.project) + else: + client = storage.Client() + + bucket = client.bucket(args.bucket) + if not bucket.exists(): + bucket = client.create_bucket(bucket, location=args.bucket_region) + + # Define test type and number of processes to run benchmarking. + # Note that transfer manager tests defaults to using 1 process. + num_processes = 1 + test_type = args.test_type + if test_type == PROFILE_WRITE_ONE_READ_THREE: + num_processes = args.workers + benchmark_runner = w1r3.run_profile_w1r3 + logging.info( + f"A total of {num_processes} processes are created to run benchmarking {test_type}" + ) + elif test_type == PROFILE_RANGE_READ: + num_processes = args.workers + benchmark_runner = w1r3.run_profile_range_read + logging.info( + f"A total of {num_processes} processes are created to run benchmarking {test_type}" + ) - # Output to CSV file - with open(args.o, "w") as file: - writer = csv.writer(file) - writer.writerow(HEADER) - for result in pool_output: - for row in result: - writer.writerow(row) - print(f"Succesfully ran benchmarking. Please find your output log at {args.o}") + # Allow multiprocessing to speed up benchmarking tests; Defaults to 1 for no concurrency. + p = multiprocessing.Pool(num_processes) + pool_output = p.map(benchmark_runner, [args for _ in range(args.samples)]) + + # Output to Cloud Monitoring or CSV file. + output_type = args.output_type + if output_type == "cloud-monitoring": + _pu.convert_to_cloud_monitoring(args.bucket, pool_output, num_processes) + elif output_type == "csv": + _pu.convert_to_csv(args.output_file, pool_output, num_processes) + logging.info( + f"Succesfully ran benchmarking. Please find your output log at {args.output_file}" + ) - # Cleanup and delete bucket - try: - bucket.delete(force=True) - except Exception as e: - logging.exception(f"Caught an exception while deleting bucket\n {e}") + # Cleanup and delete blobs. + _pu.cleanup_bucket(bucket) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( - "--min_size", + "--project", + type=str, + default=None, + help="GCP project identifier", + ) + parser.add_argument( + "--api", + type=str, + default="JSON", + help="API to use", + ) + parser.add_argument( + "--test_type", + type=str, + default=PROFILE_WRITE_ONE_READ_THREE, + help="Benchmarking test type", + ) + parser.add_argument( + "--object_size", + type=str, + default=_pu.DEFAULT_OBJECT_RANGE_SIZE_BYTES, + help="Object size in bytes; can be a range min..max", + ) + parser.add_argument( + "--range_read_size", type=int, - default=DEFAULT_MIN_SIZE, - help="Minimum object size in bytes", + default=0, + help="Size of the range to read in bytes", ) parser.add_argument( - "--max_size", + "--minimum_read_offset", type=int, - default=DEFAULT_MAX_SIZE, - help="Maximum object size in bytes", + default=0, + help="Minimum offset for the start of the range to be read in bytes", ) parser.add_argument( - "--num_samples", + "--maximum_read_offset", type=int, - default=DEFAULT_NUM_SAMPLES, - help="Number of iterations", + default=0, + help="Maximum offset for the start of the range to be read in bytes", ) parser.add_argument( - "--p", + "--samples", type=int, - default=DEFAULT_NUM_PROCESSES, + default=_pu.DEFAULT_NUM_SAMPLES, + help="Number of samples to report", + ) + parser.add_argument( + "--workers", + type=int, + default=_pu.DEFAULT_NUM_PROCESSES, help="Number of processes- multiprocessing enabled", ) parser.add_argument( - "--r", type=str, default=DEFAULT_BUCKET_LOCATION, help="Bucket location" + "--bucket", + type=str, + default=_pu.DEFAULT_BUCKET_NAME, + help="Storage bucket name", + ) + parser.add_argument( + "--bucket_region", + type=str, + default=_pu.DEFAULT_BUCKET_REGION, + help="Bucket region", ) parser.add_argument( - "--o", + "--output_type", type=str, - default=f"benchmarking{TIMESTAMP}.csv", + default="cloud-monitoring", + help="Ouput format, csv or cloud-monitoring", + ) + parser.add_argument( + "--output_file", + type=str, + default=_pu.DEFAULT_OUTPUT_FILE, help="File to output results to", ) parser.add_argument( - "--b", + "--tmp_dir", type=str, - default=f"benchmarking{TIMESTAMP}", - help="Storage bucket name", + default=_pu.DEFAULT_BASE_DIR, + help="Temp directory path on file system", ) args = parser.parse_args() diff --git a/tests/perf/profile_w1r3.py b/tests/perf/profile_w1r3.py new file mode 100644 index 000000000..50c8b5c24 --- /dev/null +++ b/tests/perf/profile_w1r3.py @@ -0,0 +1,221 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Workload W1R3 profiling script. This is not an officially supported Google product.""" + +import logging +import os +import random +import time +import uuid + +from functools import partial, update_wrapper + +from google.cloud import storage + +import _perf_utils as _pu + + +def WRITE(bucket, blob_name, checksum, size, args, **kwargs): + """Perform an upload and return latency.""" + blob = bucket.blob(blob_name) + file_path = f"{os.getcwd()}/{uuid.uuid4().hex}" + # Create random file locally on disk + with open(file_path, "wb") as file_obj: + file_obj.write(os.urandom(size)) + + start_time = time.monotonic_ns() + blob.upload_from_filename(file_path, checksum=checksum, if_generation_match=0) + end_time = time.monotonic_ns() + + elapsed_time = round( + (end_time - start_time) / 1000 + ) # convert nanoseconds to microseconds + + # Clean up local file + _pu.cleanup_file(file_path) + + return elapsed_time + + +def READ(bucket, blob_name, checksum, args, **kwargs): + """Perform a download and return latency.""" + blob = bucket.blob(blob_name) + if not blob.exists(): + raise Exception("Blob does not exist. Previous WRITE failed.") + + range_read_size = args.range_read_size + range_read_offset = kwargs.get("range_read_offset") + # Perfor range read if range_read_size is specified, else get full object. + if range_read_size != 0: + start = range_read_offset + end = start + range_read_size - 1 + else: + start = 0 + end = -1 + + file_path = f"{os.getcwd()}/{blob_name}" + with open(file_path, "wb") as file_obj: + start_time = time.monotonic_ns() + blob.download_to_file(file_obj, checksum=checksum, start=start, end=end) + end_time = time.monotonic_ns() + + elapsed_time = round( + (end_time - start_time) / 1000 + ) # convert nanoseconds to microseconds + + # Clean up local file + _pu.cleanup_file(file_path) + + return elapsed_time + + +def _wrapped_partial(func, *args, **kwargs): + """Helper method to create partial and propagate function name and doc from original function.""" + partial_func = partial(func, *args, **kwargs) + update_wrapper(partial_func, func) + return partial_func + + +def _generate_func_list(args): + """Generate Write-1-Read-3 workload.""" + bucket_name = args.bucket + blob_name = f"{_pu.TIMESTAMP}-{uuid.uuid4().hex}" + + # parse min_size and max_size from object_size + min_size, max_size = _pu.get_min_max_size(args.object_size) + # generate randmon size in bytes using a uniform distribution + size = random.randint(min_size, max_size) + + # generate random checksumming type: md5, crc32c or None + idx_checksum = random.choice([0, 1, 2]) + checksum = _pu.CHECKSUM[idx_checksum] + + # generated random read_offset + range_read_offset = random.randint( + args.minimum_read_offset, args.maximum_read_offset + ) + + func_list = [ + _wrapped_partial( + WRITE, + storage.Client().bucket(bucket_name), + blob_name, + size=size, + checksum=checksum, + args=args, + ), + *[ + _wrapped_partial( + READ, + storage.Client().bucket(bucket_name), + blob_name, + size=size, + checksum=checksum, + args=args, + num=i, + range_read_offset=range_read_offset, + ) + for i in range(3) + ], + ] + return func_list + + +def log_performance(func, args, elapsed_time, status, failure_msg): + """Hold benchmarking results per operation call.""" + size = func.keywords.get("size") + checksum = func.keywords.get("checksum", None) + num = func.keywords.get("num", None) + range_read_size = args.range_read_size + + res = { + "Op": func.__name__, + "ElapsedTimeUs": elapsed_time, + "ApiName": args.api, + "RunID": _pu.TIMESTAMP, + "CpuTimeUs": _pu.NOT_SUPPORTED, + "AppBufferSize": _pu.NOT_SUPPORTED, + "LibBufferSize": _pu.DEFAULT_LIB_BUFFER_SIZE, + "ChunkSize": 0, + "ObjectSize": size, + "TransferSize": size, + "TransferOffset": 0, + "RangeReadSize": range_read_size, + "BucketName": args.bucket, + "Library": "python-storage", + "Crc32cEnabled": checksum == "crc32c", + "MD5Enabled": checksum == "md5", + "FailureMsg": failure_msg, + "Status": status, + } + + if res["Op"] == "READ": + res["Op"] += f"[{num}]" + + # For range reads (workload 2), record additional outputs + if range_read_size > 0: + res["TransferSize"] = range_read_size + res["TransferOffset"] = func.keywords.get("range_read_offset", 0) + + return res + + +def run_profile_w1r3(args): + """Run w1r3 benchmarking. This is a wrapper used with the main benchmarking framework.""" + results = [] + + for func in _generate_func_list(args): + failure_msg = "" + try: + elapsed_time = func() + except Exception as e: + failure_msg = ( + f"Caught an exception while running operation {func.__name__}\n {e}" + ) + logging.exception(failure_msg) + status = ["FAIL"] + elapsed_time = _pu.NOT_SUPPORTED + else: + status = ["OK"] + + res = log_performance(func, args, elapsed_time, status, failure_msg) + results.append(res) + + return results + + +def run_profile_range_read(args): + """Run range read W2 benchmarking. This is a wrapper used with the main benchmarking framework.""" + results = [] + + for func in _generate_func_list(args): + failure_msg = "" + try: + elapsed_time = func() + except Exception as e: + failure_msg = ( + f"Caught an exception while running operation {func.__name__}\n {e}" + ) + logging.exception(failure_msg) + status = ["FAIL"] + elapsed_time = _pu.NOT_SUPPORTED + else: + status = ["OK"] + + # Only measure the last read + res = log_performance(func, args, elapsed_time, status, failure_msg) + results.append(res) + + return results diff --git a/tests/system/test_blob.py b/tests/system/test_blob.py index 37e454737..2d6a76b80 100644 --- a/tests/system/test_blob.py +++ b/tests/system/test_blob.py @@ -117,6 +117,11 @@ def test_large_file_write_from_stream_w_encryption_key( _check_blob_hash(blob, info) + blob_without_key = shared_bucket.blob("LargeFile") + with tempfile.TemporaryFile() as tmp: + with pytest.raises(exceptions.BadRequest): + storage_client.download_blob_to_file(blob_without_key, tmp) + with tempfile.NamedTemporaryFile() as temp_f: with open(temp_f.name, "wb") as file_obj: storage_client.download_blob_to_file(blob, file_obj) diff --git a/tests/system/test_transfer_manager.py b/tests/system/test_transfer_manager.py index 0b639170d..bc7e0d31e 100644 --- a/tests/system/test_transfer_manager.py +++ b/tests/system/test_transfer_manager.py @@ -14,11 +14,15 @@ # limitations under the License. import tempfile +import os from google.cloud.storage import transfer_manager +from google.cloud.storage._helpers import _base64_md5hash from google.api_core import exceptions +DEADLINE = 30 + def test_upload_many(shared_bucket, file_data, blobs_to_delete): FILE_BLOB_PAIRS = [ @@ -26,7 +30,11 @@ def test_upload_many(shared_bucket, file_data, blobs_to_delete): (file_data["simple"]["path"], shared_bucket.blob("simple2")), ] - results = transfer_manager.upload_many(FILE_BLOB_PAIRS) + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + worker_type=transfer_manager.PROCESS, + deadline=DEADLINE, + ) assert results == [None, None] blobs = shared_bucket.list_blobs() @@ -36,13 +44,19 @@ def test_upload_many(shared_bucket, file_data, blobs_to_delete): assert len(blobs_to_delete) == 2 -def test_upload_many_with_file_objs(shared_bucket, file_data, blobs_to_delete): +def test_upload_many_with_threads_and_file_objs( + shared_bucket, file_data, blobs_to_delete +): FILE_BLOB_PAIRS = [ (open(file_data["simple"]["path"], "rb"), shared_bucket.blob("simple1")), (open(file_data["simple"]["path"], "rb"), shared_bucket.blob("simple2")), ] - results = transfer_manager.upload_many(FILE_BLOB_PAIRS) + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + worker_type=transfer_manager.THREAD, + deadline=DEADLINE, + ) assert results == [None, None] blobs = shared_bucket.list_blobs() @@ -61,7 +75,10 @@ def test_upload_many_skip_if_exists( ] results = transfer_manager.upload_many( - FILE_BLOB_PAIRS, skip_if_exists=True, raise_exception=True + FILE_BLOB_PAIRS, + skip_if_exists=True, + raise_exception=True, + deadline=DEADLINE, ) assert isinstance(results[0], exceptions.PreconditionFailed) assert results[1] is None @@ -75,10 +92,82 @@ def test_upload_many_skip_if_exists( def test_download_many(listable_bucket): blobs = list(listable_bucket.list_blobs()) - tempfiles = [tempfile.TemporaryFile(), tempfile.TemporaryFile()] - BLOB_FILE_PAIRS = zip(blobs[:2], tempfiles) - - results = transfer_manager.download_many(BLOB_FILE_PAIRS) - assert results == [None, None] - for fp in tempfiles: - assert fp.tell() != 0 + with tempfile.TemporaryDirectory() as tempdir: + filenames = [ + os.path.join(tempdir, "file_a.txt"), + os.path.join(tempdir, "file_b.txt"), + ] + BLOB_FILE_PAIRS = zip(blobs[:2], filenames) + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, + worker_type=transfer_manager.PROCESS, + deadline=DEADLINE, + ) + assert results == [None, None] + for count, filename in enumerate(filenames): + with open(filename, "rb") as fp: + assert len(fp.read()) == blobs[count].size + + +def test_download_many_with_threads_and_file_objs(listable_bucket): + blobs = list(listable_bucket.list_blobs()) + with tempfile.TemporaryFile() as file_a, tempfile.TemporaryFile() as file_b: + tempfiles = [file_a, file_b] + BLOB_FILE_PAIRS = zip(blobs[:2], tempfiles) + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, + worker_type=transfer_manager.THREAD, + deadline=DEADLINE, + ) + assert results == [None, None] + for fp in tempfiles: + assert fp.tell() != 0 + + +def test_download_chunks_concurrently(shared_bucket, file_data): + # Upload a big file + source_file = file_data["big"] + upload_blob = shared_bucket.blob("chunky_file") + upload_blob.upload_from_filename(source_file["path"]) + upload_blob.reload() + size = upload_blob.size + chunk_size = size // 32 + + # Get a fresh blob obj w/o metadata for testing purposes + download_blob = shared_bucket.blob("chunky_file") + + with tempfile.TemporaryDirectory() as tempdir: + full_filename = os.path.join(tempdir, "chunky_file_1") + transfer_manager.download_chunks_concurrently( + download_blob, + full_filename, + chunk_size=chunk_size, + deadline=DEADLINE, + ) + with open(full_filename, "rb") as file_obj: + assert _base64_md5hash(file_obj) == source_file["hash"] + + # Now test for case where last chunk is exactly 1 byte. + trailing_chunk_filename = os.path.join(tempdir, "chunky_file_2") + transfer_manager.download_chunks_concurrently( + download_blob, + trailing_chunk_filename, + chunk_size=size - 1, + deadline=DEADLINE, + ) + with open(trailing_chunk_filename, "rb") as file_obj: + assert _base64_md5hash(file_obj) == source_file["hash"] + + # Also test threaded mode. + threaded_filename = os.path.join(tempdir, "chunky_file_3") + transfer_manager.download_chunks_concurrently( + download_blob, + threaded_filename, + chunk_size=chunk_size, + deadline=DEADLINE, + worker_type=transfer_manager.THREAD, + ) + with open(threaded_filename, "rb") as file_obj: + assert _base64_md5hash(file_obj) == source_file["hash"] diff --git a/tests/unit/test_transfer_manager.py b/tests/unit/test_transfer_manager.py index f52d5471b..bdfd236b5 100644 --- a/tests/unit/test_transfer_manager.py +++ b/tests/unit/test_transfer_manager.py @@ -17,258 +17,483 @@ with pytest.warns(UserWarning): from google.cloud.storage import transfer_manager +from google.cloud.storage import Blob + from google.api_core import exceptions import os import tempfile -import unittest import mock - - -class Test_Transfer_Manager(unittest.TestCase): - def test_upload_many_with_filenames(self): - FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] - FAKE_CONTENT_TYPE = "text/fake" - UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} - EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} - FAKE_RESULT = "nothing to see here" - - for _, blob_mock in FILE_BLOB_PAIRS: - blob_mock.upload_from_filename.return_value = FAKE_RESULT - - results = transfer_manager.upload_many( - FILE_BLOB_PAIRS, skip_if_exists=True, upload_kwargs=UPLOAD_KWARGS +import pickle + +BLOB_TOKEN_STRING = "blob token" +FAKE_CONTENT_TYPE = "text/fake" +UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} +FAKE_RESULT = "nothing to see here" +FAKE_ENCODING = "fake_gzip" +DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} +CHUNK_SIZE = 8 + + +# Used in subprocesses only, so excluded from coverage +def _validate_blob_token_in_subprocess( + maybe_pickled_blob, method_name, path_or_file, **kwargs +): # pragma: NO COVER + assert pickle.loads(maybe_pickled_blob) == BLOB_TOKEN_STRING + assert method_name.endswith("filename") + assert path_or_file.startswith("file") + assert kwargs == UPLOAD_KWARGS or kwargs == DOWNLOAD_KWARGS + return FAKE_RESULT + + +def test_upload_many_with_filenames(): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock(spec=Blob)), + ("file_b.txt", mock.Mock(spec=Blob)), + ] + EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} + + for _, blob_mock in FILE_BLOB_PAIRS: + blob_mock.upload_from_filename.return_value = FAKE_RESULT + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + ) + for (filename, mock_blob) in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.assert_any_call( + filename, **EXPECTED_UPLOAD_KWARGS ) - for (filename, mock_blob) in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.assert_any_call( - filename, **EXPECTED_UPLOAD_KWARGS - ) - for result in results: - self.assertEqual(result, FAKE_RESULT) - - def test_upload_many_with_file_objs(self): - FILE_BLOB_PAIRS = [ - (tempfile.TemporaryFile(), mock.Mock()), - (tempfile.TemporaryFile(), mock.Mock()), - ] - FAKE_CONTENT_TYPE = "text/fake" - UPLOAD_KWARGS = {"content-type": FAKE_CONTENT_TYPE} - EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} - FAKE_RESULT = "nothing to see here" - - for _, blob_mock in FILE_BLOB_PAIRS: - blob_mock.upload_from_file.return_value = FAKE_RESULT - - results = transfer_manager.upload_many( - FILE_BLOB_PAIRS, skip_if_exists=True, upload_kwargs=UPLOAD_KWARGS + for result in results: + assert result == FAKE_RESULT + + +def test_upload_many_with_file_objs(): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + ] + EXPECTED_UPLOAD_KWARGS = {"if_generation_match": 0, **UPLOAD_KWARGS} + + for _, blob_mock in FILE_BLOB_PAIRS: + blob_mock.upload_from_file.return_value = FAKE_RESULT + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + ) + for (file, mock_blob) in FILE_BLOB_PAIRS: + mock_blob.upload_from_file.assert_any_call(file, **EXPECTED_UPLOAD_KWARGS) + for result in results: + assert result == FAKE_RESULT + + +def test_upload_many_passes_concurrency_options(): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( + "concurrent.futures.wait" + ) as wait_patch: + transfer_manager.upload_many( + FILE_BLOB_PAIRS, + deadline=DEADLINE, + worker_type=transfer_manager.THREAD, + max_workers=MAX_WORKERS, ) - for (file, mock_blob) in FILE_BLOB_PAIRS: - mock_blob.upload_from_file.assert_any_call(file, **EXPECTED_UPLOAD_KWARGS) - for result in results: - self.assertEqual(result, FAKE_RESULT) - - def test_upload_many_passes_concurrency_options(self): - FILE_BLOB_PAIRS = [ - (tempfile.TemporaryFile(), mock.Mock()), - (tempfile.TemporaryFile(), mock.Mock()), - ] - MAX_WORKERS = 7 - DEADLINE = 10 - with mock.patch( - "concurrent.futures.ThreadPoolExecutor" - ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + +def test_threads_deprecation_with_upload(): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( + "concurrent.futures.wait" + ) as wait_patch: + with pytest.warns(): transfer_manager.upload_many( - FILE_BLOB_PAIRS, threads=MAX_WORKERS, deadline=DEADLINE - ) - pool_patch.assert_called_with(max_workers=MAX_WORKERS) - wait_patch.assert_called_with( - mock.ANY, timeout=DEADLINE, return_when=mock.ANY + FILE_BLOB_PAIRS, deadline=DEADLINE, threads=MAX_WORKERS ) + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + +def test_threads_deprecation_conflict_with_upload(): + FILE_BLOB_PAIRS = [ + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + (tempfile.TemporaryFile(), mock.Mock(spec=Blob)), + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with pytest.raises(ValueError): + transfer_manager.upload_many( + FILE_BLOB_PAIRS, + deadline=DEADLINE, + threads=5, + worker_type=transfer_manager.THREAD, + max_workers=MAX_WORKERS, + ) - def test_upload_many_suppresses_exceptions(self): - FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] - for _, mock_blob in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.side_effect = ConnectionError() - - results = transfer_manager.upload_many(FILE_BLOB_PAIRS) - for result in results: - self.assertEqual(type(result), ConnectionError) - - def test_upload_many_raises_exceptions(self): - FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] - for _, mock_blob in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.side_effect = ConnectionError() - with self.assertRaises(ConnectionError): - transfer_manager.upload_many(FILE_BLOB_PAIRS, raise_exception=True) +def test_upload_many_suppresses_exceptions(): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock(spec=Blob)), + ("file_b.txt", mock.Mock(spec=Blob)), + ] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = ConnectionError() + + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, worker_type=transfer_manager.THREAD + ) + for result in results: + assert isinstance(result, ConnectionError) + + +def test_upload_many_raises_exceptions(): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock(spec=Blob)), + ("file_b.txt", mock.Mock(spec=Blob)), + ] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = ConnectionError() + + with pytest.raises(ConnectionError): + transfer_manager.upload_many( + FILE_BLOB_PAIRS, raise_exception=True, worker_type=transfer_manager.THREAD + ) - def test_upload_many_suppresses_412_with_skip_if_exists(self): - FILE_BLOB_PAIRS = [("file_a.txt", mock.Mock()), ("file_b.txt", mock.Mock())] - for _, mock_blob in FILE_BLOB_PAIRS: - mock_blob.upload_from_filename.side_effect = exceptions.PreconditionFailed( - "412" - ) +def test_upload_many_suppresses_412_with_skip_if_exists(): + FILE_BLOB_PAIRS = [ + ("file_a.txt", mock.Mock(spec=Blob)), + ("file_b.txt", mock.Mock(spec=Blob)), + ] + for _, mock_blob in FILE_BLOB_PAIRS: + mock_blob.upload_from_filename.side_effect = exceptions.PreconditionFailed( + "412" + ) + results = transfer_manager.upload_many( + FILE_BLOB_PAIRS, + skip_if_exists=True, + raise_exception=True, + worker_type=transfer_manager.THREAD, + ) + for result in results: + assert type(result) == exceptions.PreconditionFailed + + +def test_upload_many_with_processes(): + # Mocks are not pickleable, so we send token strings over the wire. + FILE_BLOB_PAIRS = [ + ("file_a.txt", BLOB_TOKEN_STRING), + ("file_b.txt", BLOB_TOKEN_STRING), + ] + + with mock.patch( + "google.cloud.storage.transfer_manager._call_method_on_maybe_pickled_blob", + new=_validate_blob_token_in_subprocess, + ): results = transfer_manager.upload_many( - FILE_BLOB_PAIRS, skip_if_exists=True, raise_exception=True + FILE_BLOB_PAIRS, + upload_kwargs=UPLOAD_KWARGS, + worker_type=transfer_manager.PROCESS, + raise_exception=True, ) - for result in results: - self.assertEqual(type(result), exceptions.PreconditionFailed) - - def test_download_many_with_filenames(self): - BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] - FAKE_ENCODING = "fake_gzip" - DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} - FAKE_RESULT = "nothing to see here" + for result in results: + assert result == FAKE_RESULT + + +def test_upload_many_with_processes_rejects_file_obj(): + # Mocks are not pickleable, so we send token strings over the wire. + FILE_BLOB_PAIRS = [ + ("file_a.txt", BLOB_TOKEN_STRING), + (tempfile.TemporaryFile(), BLOB_TOKEN_STRING), + ] + + with mock.patch( + "google.cloud.storage.transfer_manager._call_method_on_maybe_pickled_blob", + new=_validate_blob_token_in_subprocess, + ): + with pytest.raises(ValueError): + transfer_manager.upload_many( + FILE_BLOB_PAIRS, + upload_kwargs=UPLOAD_KWARGS, + worker_type=transfer_manager.PROCESS, + ) - for blob_mock, _ in BLOB_FILE_PAIRS: - blob_mock.download_to_filename.return_value = FAKE_RESULT - results = transfer_manager.download_many( - BLOB_FILE_PAIRS, download_kwargs=DOWNLOAD_KWARGS +def test_download_many_with_filenames(): + BLOB_FILE_PAIRS = [ + (mock.Mock(spec=Blob), "file_a.txt"), + (mock.Mock(spec=Blob), "file_b.txt"), + ] + + for blob_mock, _ in BLOB_FILE_PAIRS: + blob_mock.download_to_filename.return_value = FAKE_RESULT + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + ) + for (mock_blob, file) in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.assert_any_call(file, **DOWNLOAD_KWARGS) + for result in results: + assert result == FAKE_RESULT + + +def test_download_many_with_file_objs(): + BLOB_FILE_PAIRS = [ + (mock.Mock(spec=Blob), tempfile.TemporaryFile()), + (mock.Mock(spec=Blob), tempfile.TemporaryFile()), + ] + + for blob_mock, _ in BLOB_FILE_PAIRS: + blob_mock.download_to_file.return_value = FAKE_RESULT + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + ) + for (mock_blob, file) in BLOB_FILE_PAIRS: + mock_blob.download_to_file.assert_any_call(file, **DOWNLOAD_KWARGS) + for result in results: + assert result == FAKE_RESULT + + +def test_download_many_passes_concurrency_options(): + BLOB_FILE_PAIRS = [ + (mock.Mock(spec=Blob), tempfile.TemporaryFile()), + (mock.Mock(spec=Blob), tempfile.TemporaryFile()), + ] + MAX_WORKERS = 7 + DEADLINE = 10 + with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( + "concurrent.futures.wait" + ) as wait_patch: + transfer_manager.download_many( + BLOB_FILE_PAIRS, + deadline=DEADLINE, + worker_type=transfer_manager.THREAD, + max_workers=MAX_WORKERS, ) - for (mock_blob, file) in BLOB_FILE_PAIRS: - mock_blob.download_to_filename.assert_any_call(file, **DOWNLOAD_KWARGS) - for result in results: - self.assertEqual(result, FAKE_RESULT) - - def test_download_many_with_file_objs(self): - BLOB_FILE_PAIRS = [ - (mock.Mock(), tempfile.TemporaryFile()), - (mock.Mock(), tempfile.TemporaryFile()), - ] - FAKE_ENCODING = "fake_gzip" - DOWNLOAD_KWARGS = {"accept-encoding": FAKE_ENCODING} - FAKE_RESULT = "nothing to see here" + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + +def test_download_many_suppresses_exceptions(): + BLOB_FILE_PAIRS = [ + (mock.Mock(spec=Blob), "file_a.txt"), + (mock.Mock(spec=Blob), "file_b.txt"), + ] + for mock_blob, _ in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.side_effect = ConnectionError() + + results = transfer_manager.download_many( + BLOB_FILE_PAIRS, worker_type=transfer_manager.THREAD + ) + for result in results: + assert isinstance(result, ConnectionError) + + +def test_download_many_raises_exceptions(): + BLOB_FILE_PAIRS = [ + (mock.Mock(spec=Blob), "file_a.txt"), + (mock.Mock(spec=Blob), "file_b.txt"), + ] + for mock_blob, _ in BLOB_FILE_PAIRS: + mock_blob.download_to_filename.side_effect = ConnectionError() + + with pytest.raises(ConnectionError): + transfer_manager.download_many( + BLOB_FILE_PAIRS, raise_exception=True, worker_type=transfer_manager.THREAD + ) + - for blob_mock, _ in BLOB_FILE_PAIRS: - blob_mock.download_to_file.return_value = FAKE_RESULT +def test_download_many_with_processes(): + # Mocks are not pickleable, so we send token strings over the wire. + BLOB_FILE_PAIRS = [ + (BLOB_TOKEN_STRING, "file_a.txt"), + (BLOB_TOKEN_STRING, "file_b.txt"), + ] + with mock.patch( + "google.cloud.storage.transfer_manager._call_method_on_maybe_pickled_blob", + new=_validate_blob_token_in_subprocess, + ): results = transfer_manager.download_many( - BLOB_FILE_PAIRS, download_kwargs=DOWNLOAD_KWARGS + BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.PROCESS, ) - for (mock_blob, file) in BLOB_FILE_PAIRS: - mock_blob.download_to_file.assert_any_call(file, **DOWNLOAD_KWARGS) - for result in results: - self.assertEqual(result, FAKE_RESULT) - - def test_download_many_passes_concurrency_options(self): - BLOB_FILE_PAIRS = [ - (mock.Mock(), tempfile.TemporaryFile()), - (mock.Mock(), tempfile.TemporaryFile()), - ] - MAX_WORKERS = 7 - DEADLINE = 10 - with mock.patch( - "concurrent.futures.ThreadPoolExecutor" - ) as pool_patch, mock.patch("concurrent.futures.wait") as wait_patch: + for result in results: + assert result == FAKE_RESULT + + +def test_download_many_with_processes_rejects_file_obj(): + # Mocks are not pickleable, so we send token strings over the wire. + BLOB_FILE_PAIRS = [ + (BLOB_TOKEN_STRING, "file_a.txt"), + (BLOB_TOKEN_STRING, tempfile.TemporaryFile()), + ] + + with mock.patch( + "google.cloud.storage.transfer_manager._call_method_on_maybe_pickled_blob", + new=_validate_blob_token_in_subprocess, + ): + with pytest.raises(ValueError): transfer_manager.download_many( - BLOB_FILE_PAIRS, threads=MAX_WORKERS, deadline=DEADLINE - ) - pool_patch.assert_called_with(max_workers=MAX_WORKERS) - wait_patch.assert_called_with( - mock.ANY, timeout=DEADLINE, return_when=mock.ANY + BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.PROCESS, ) - def test_download_many_suppresses_exceptions(self): - BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] - for mock_blob, _ in BLOB_FILE_PAIRS: - mock_blob.download_to_filename.side_effect = ConnectionError() - - results = transfer_manager.download_many(BLOB_FILE_PAIRS) - for result in results: - self.assertEqual(type(result), ConnectionError) - - def test_download_many_raises_exceptions(self): - BLOB_FILE_PAIRS = [(mock.Mock(), "file_a.txt"), (mock.Mock(), "file_b.txt")] - for mock_blob, _ in BLOB_FILE_PAIRS: - mock_blob.download_to_filename.side_effect = ConnectionError() - - transfer_manager.download_many(BLOB_FILE_PAIRS) - with self.assertRaises(ConnectionError): - transfer_manager.download_many(BLOB_FILE_PAIRS, raise_exception=True) - - def test_upload_many_from_filenames(self): - bucket = mock.Mock() - - FILENAMES = ["file_a.txt", "file_b.txt"] - ROOT = "mypath/" - PREFIX = "myprefix/" - KEY_NAME = "keyname" - BLOB_CONSTRUCTOR_KWARGS = {"kms_key_name": KEY_NAME} - UPLOAD_KWARGS = {"content-type": "text/fake"} - MAX_WORKERS = 7 - DEADLINE = 10 - - EXPECTED_FILE_BLOB_PAIRS = [ - (os.path.join(ROOT, filename), mock.ANY) for filename in FILENAMES - ] - - with mock.patch( - "google.cloud.storage.transfer_manager.upload_many" - ) as mock_upload_many: - transfer_manager.upload_many_from_filenames( - bucket, - FILENAMES, - source_directory=ROOT, - blob_name_prefix=PREFIX, - skip_if_exists=True, - blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, - upload_kwargs=UPLOAD_KWARGS, - threads=MAX_WORKERS, - deadline=DEADLINE, - raise_exception=True, - ) - mock_upload_many.assert_called_once_with( - EXPECTED_FILE_BLOB_PAIRS, +def test_upload_many_from_filenames(): + bucket = mock.Mock() + + FILENAMES = ["file_a.txt", "file_b.txt"] + ROOT = "mypath/" + PREFIX = "myprefix/" + KEY_NAME = "keyname" + BLOB_CONSTRUCTOR_KWARGS = {"kms_key_name": KEY_NAME} + UPLOAD_KWARGS = {"content-type": "text/fake"} + MAX_WORKERS = 7 + DEADLINE = 10 + WORKER_TYPE = transfer_manager.THREAD + + EXPECTED_FILE_BLOB_PAIRS = [ + (os.path.join(ROOT, filename), mock.ANY) for filename in FILENAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.upload_many" + ) as mock_upload_many: + transfer_manager.upload_many_from_filenames( + bucket, + FILENAMES, + source_directory=ROOT, + blob_name_prefix=PREFIX, skip_if_exists=True, + blob_constructor_kwargs=BLOB_CONSTRUCTOR_KWARGS, upload_kwargs=UPLOAD_KWARGS, - threads=MAX_WORKERS, deadline=DEADLINE, raise_exception=True, + worker_type=WORKER_TYPE, + max_workers=MAX_WORKERS, ) - bucket.blob.assert_any_call(PREFIX + FILENAMES[0], **BLOB_CONSTRUCTOR_KWARGS) - bucket.blob.assert_any_call(PREFIX + FILENAMES[1], **BLOB_CONSTRUCTOR_KWARGS) - def test_upload_many_from_filenames_minimal_args(self): - bucket = mock.Mock() + mock_upload_many.assert_called_once_with( + EXPECTED_FILE_BLOB_PAIRS, + skip_if_exists=True, + upload_kwargs=UPLOAD_KWARGS, + deadline=DEADLINE, + raise_exception=True, + worker_type=WORKER_TYPE, + max_workers=MAX_WORKERS, + ) + bucket.blob.assert_any_call(PREFIX + FILENAMES[0], **BLOB_CONSTRUCTOR_KWARGS) + bucket.blob.assert_any_call(PREFIX + FILENAMES[1], **BLOB_CONSTRUCTOR_KWARGS) - FILENAMES = ["file_a.txt", "file_b.txt"] - EXPECTED_FILE_BLOB_PAIRS = [(filename, mock.ANY) for filename in FILENAMES] +def test_upload_many_from_filenames_minimal_args(): + bucket = mock.Mock() - with mock.patch( - "google.cloud.storage.transfer_manager.upload_many" - ) as mock_upload_many: - transfer_manager.upload_many_from_filenames( - bucket, - FILENAMES, - ) + FILENAMES = ["file_a.txt", "file_b.txt"] - mock_upload_many.assert_called_once_with( - EXPECTED_FILE_BLOB_PAIRS, - skip_if_exists=False, - upload_kwargs=None, - threads=4, - deadline=None, - raise_exception=False, + EXPECTED_FILE_BLOB_PAIRS = [(filename, mock.ANY) for filename in FILENAMES] + + with mock.patch( + "google.cloud.storage.transfer_manager.upload_many" + ) as mock_upload_many: + transfer_manager.upload_many_from_filenames( + bucket, + FILENAMES, ) - bucket.blob.assert_any_call(FILENAMES[0]) - bucket.blob.assert_any_call(FILENAMES[1]) - def test_download_many_to_path(self): - bucket = mock.Mock() + mock_upload_many.assert_called_once_with( + EXPECTED_FILE_BLOB_PAIRS, + skip_if_exists=False, + upload_kwargs=None, + deadline=None, + raise_exception=False, + worker_type=transfer_manager.PROCESS, + max_workers=8, + ) + bucket.blob.assert_any_call(FILENAMES[0]) + bucket.blob.assert_any_call(FILENAMES[1]) + + +def test_download_many_to_path(): + bucket = mock.Mock() + + BLOBNAMES = ["file_a.txt", "file_b.txt", "dir_a/file_c.txt"] + PATH_ROOT = "mypath/" + BLOB_NAME_PREFIX = "myprefix/" + DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} + MAX_WORKERS = 7 + DEADLINE = 10 + WORKER_TYPE = transfer_manager.THREAD + + EXPECTED_BLOB_FILE_PAIRS = [ + (mock.ANY, os.path.join(PATH_ROOT, blobname)) for blobname in BLOBNAMES + ] + + with mock.patch( + "google.cloud.storage.transfer_manager.download_many" + ) as mock_download_many: + transfer_manager.download_many_to_path( + bucket, + BLOBNAMES, + destination_directory=PATH_ROOT, + blob_name_prefix=BLOB_NAME_PREFIX, + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + create_directories=False, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + ) - BLOBNAMES = ["file_a.txt", "file_b.txt", "dir_a/file_c.txt"] - PATH_ROOT = "mypath/" - BLOB_NAME_PREFIX = "myprefix/" - DOWNLOAD_KWARGS = {"accept-encoding": "fake-gzip"} - MAX_WORKERS = 7 - DEADLINE = 10 + mock_download_many.assert_called_once_with( + EXPECTED_BLOB_FILE_PAIRS, + download_kwargs=DOWNLOAD_KWARGS, + deadline=DEADLINE, + raise_exception=True, + max_workers=MAX_WORKERS, + worker_type=WORKER_TYPE, + ) + for blobname in BLOBNAMES: + bucket.blob.assert_any_call(BLOB_NAME_PREFIX + blobname) + + +def test_download_many_to_path_creates_directories(): + bucket = mock.Mock() + + with tempfile.TemporaryDirectory() as tempdir: + DIR_NAME = "dir_a/dir_b" + BLOBNAMES = [ + "file_a.txt", + "file_b.txt", + os.path.join(DIR_NAME, "file_c.txt"), + ] EXPECTED_BLOB_FILE_PAIRS = [ - (mock.ANY, os.path.join(PATH_ROOT, blobname)) for blobname in BLOBNAMES + (mock.ANY, os.path.join(tempdir, blobname)) for blobname in BLOBNAMES ] with mock.patch( @@ -277,59 +502,214 @@ def test_download_many_to_path(self): transfer_manager.download_many_to_path( bucket, BLOBNAMES, - destination_directory=PATH_ROOT, - blob_name_prefix=BLOB_NAME_PREFIX, - download_kwargs=DOWNLOAD_KWARGS, - threads=MAX_WORKERS, - deadline=DEADLINE, - create_directories=False, + destination_directory=tempdir, + create_directories=True, raise_exception=True, ) mock_download_many.assert_called_once_with( EXPECTED_BLOB_FILE_PAIRS, - download_kwargs=DOWNLOAD_KWARGS, - threads=MAX_WORKERS, - deadline=DEADLINE, + download_kwargs=None, + deadline=None, raise_exception=True, + worker_type=transfer_manager.PROCESS, + max_workers=8, ) for blobname in BLOBNAMES: - bucket.blob.assert_any_call(BLOB_NAME_PREFIX + blobname) - - def test_download_many_to_path_creates_directories(self): - bucket = mock.Mock() - - with tempfile.TemporaryDirectory() as tempdir: - DIR_NAME = "dir_a/dir_b" - BLOBNAMES = [ - "file_a.txt", - "file_b.txt", - os.path.join(DIR_NAME, "file_c.txt"), - ] - - EXPECTED_BLOB_FILE_PAIRS = [ - (mock.ANY, os.path.join(tempdir, blobname)) for blobname in BLOBNAMES - ] - - with mock.patch( - "google.cloud.storage.transfer_manager.download_many" - ) as mock_download_many: - transfer_manager.download_many_to_path( - bucket, - BLOBNAMES, - destination_directory=tempdir, - create_directories=True, - raise_exception=True, - ) - - mock_download_many.assert_called_once_with( - EXPECTED_BLOB_FILE_PAIRS, - download_kwargs=None, - threads=4, - deadline=None, - raise_exception=True, + bucket.blob.assert_any_call(blobname) + + assert os.path.isdir(os.path.join(tempdir, DIR_NAME)) + + +def test_download_chunks_concurrently(): + blob_mock = mock.Mock(spec=Blob) + FILENAME = "file_a.txt" + MULTIPLE = 4 + blob_mock.size = CHUNK_SIZE * MULTIPLE + + blob_mock.download_to_filename.return_value = FAKE_RESULT + + with mock.patch("__main__.open", mock.mock_open()): + result = transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.THREAD, + ) + for x in range(MULTIPLE): + blob_mock.download_to_file.assert_any_call( + mock.ANY, + **DOWNLOAD_KWARGS, + start=x * CHUNK_SIZE, + end=((x + 1) * CHUNK_SIZE) - 1 + ) + assert blob_mock.download_to_file.call_count == 4 + assert result is None + + +def test_download_chunks_concurrently_raises_on_start_and_end(): + blob_mock = mock.Mock(spec=Blob) + FILENAME = "file_a.txt" + MULTIPLE = 4 + blob_mock.size = CHUNK_SIZE * MULTIPLE + + with mock.patch("__main__.open", mock.mock_open()): + with pytest.raises(ValueError): + transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + worker_type=transfer_manager.THREAD, + download_kwargs={ + "start": CHUNK_SIZE, + }, ) - for blobname in BLOBNAMES: - bucket.blob.assert_any_call(blobname) + with pytest.raises(ValueError): + transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + worker_type=transfer_manager.THREAD, + download_kwargs={ + "end": (CHUNK_SIZE * (MULTIPLE - 1)) - 1, + }, + ) + + +def test_download_chunks_concurrently_passes_concurrency_options(): + blob_mock = mock.Mock(spec=Blob) + FILENAME = "file_a.txt" + MAX_WORKERS = 7 + DEADLINE = 10 + MULTIPLE = 4 + blob_mock.size = CHUNK_SIZE * MULTIPLE + + with mock.patch("concurrent.futures.ThreadPoolExecutor") as pool_patch, mock.patch( + "concurrent.futures.wait" + ) as wait_patch, mock.patch("__main__.open", mock.mock_open()): + transfer_manager.download_chunks_concurrently( + blob_mock, + FILENAME, + chunk_size=CHUNK_SIZE, + deadline=DEADLINE, + worker_type=transfer_manager.THREAD, + max_workers=MAX_WORKERS, + ) + pool_patch.assert_called_with(max_workers=MAX_WORKERS) + wait_patch.assert_called_with(mock.ANY, timeout=DEADLINE, return_when=mock.ANY) + + +class _PickleableMockBlob: + def __init__( + self, + name="", + size=None, + generation=None, + size_after_reload=None, + generation_after_reload=None, + ): + self.name = name + self.size = size + self.generation = generation + self._size_after_reload = size_after_reload + self._generation_after_reload = generation_after_reload + + def reload(self): + self.size = self._size_after_reload + self.generation = self._generation_after_reload + + def download_to_file(self, *args, **kwargs): + return "SUCCESS" + + +# Used in subprocesses only, so excluded from coverage +def _validate_blob_token_in_subprocess_for_chunk( + maybe_pickled_blob, filename, **kwargs +): # pragma: NO COVER + blob = pickle.loads(maybe_pickled_blob) + assert isinstance(blob, _PickleableMockBlob) + assert filename.startswith("file") + return FAKE_RESULT + + +def test_download_chunks_concurrently_with_processes(): + blob = _PickleableMockBlob( + "file_a_blob", size_after_reload=24, generation_after_reload=100 + ) + FILENAME = "file_a.txt" + + with mock.patch( + "google.cloud.storage.transfer_manager._download_and_write_chunk_in_place", + new=_validate_blob_token_in_subprocess_for_chunk, + ), mock.patch("__main__.open", mock.mock_open()): + result = transfer_manager.download_chunks_concurrently( + blob, + FILENAME, + chunk_size=CHUNK_SIZE, + download_kwargs=DOWNLOAD_KWARGS, + worker_type=transfer_manager.PROCESS, + ) + assert result is None + + +def test__LazyClient(): + fake_cache = {} + MOCK_ID = 9999 + with mock.patch( + "google.cloud.storage.transfer_manager._cached_clients", new=fake_cache + ), mock.patch("google.cloud.storage.transfer_manager.Client"): + lazyclient = transfer_manager._LazyClient(MOCK_ID) + lazyclient_cached = transfer_manager._LazyClient(MOCK_ID) + assert lazyclient is lazyclient_cached + assert len(fake_cache) == 1 + + +def test__pickle_blob(): + # This test nominally has coverage, but doesn't assert that the essential + # copyreg behavior in _pickle_blob works. Unfortunately there doesn't seem + # to be a good way to check that without actually creating a Client, which + # will spin up HTTP connections undesirably. This is more fully checked in + # the system tests, though. + pkl = transfer_manager._pickle_blob(FAKE_RESULT) + assert pickle.loads(pkl) == FAKE_RESULT + + +def test__download_and_write_chunk_in_place(): + pickled_mock = pickle.dumps(_PickleableMockBlob()) + FILENAME = "file_a.txt" + with mock.patch("__main__.open", mock.mock_open()): + result = transfer_manager._download_and_write_chunk_in_place( + pickled_mock, FILENAME, 0, 8, {} + ) + assert result == "SUCCESS" + + +def test__get_pool_class_and_requirements_error(): + with pytest.raises(ValueError): + transfer_manager._get_pool_class_and_requirements("garbage") + + +def test__reduce_client(): + fake_cache = {} + client = mock.Mock() + + with mock.patch( + "google.cloud.storage.transfer_manager._cached_clients", new=fake_cache + ), mock.patch("google.cloud.storage.transfer_manager.Client"): + transfer_manager._reduce_client(client) + + +def test__call_method_on_maybe_pickled_blob(): + blob = mock.Mock(spec=Blob) + blob.download_to_file.return_value = "SUCCESS" + result = transfer_manager._call_method_on_maybe_pickled_blob( + blob, "download_to_file" + ) + assert result == "SUCCESS" - assert os.path.isdir(os.path.join(tempdir, DIR_NAME)) + pickled_blob = pickle.dumps(_PickleableMockBlob()) + result = transfer_manager._call_method_on_maybe_pickled_blob( + pickled_blob, "download_to_file" + ) + assert result == "SUCCESS"