forked from googleapis/google-cloud-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcredentials.py
More file actions
295 lines (236 loc) · 11.8 KB
/
credentials.py
File metadata and controls
295 lines (236 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A simple wrapper around the OAuth2 credentials library."""
import base64
import calendar
import datetime
import six
from six.moves.urllib.parse import urlencode # pylint: disable=F0401
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
from oauth2client import client
from oauth2client.client import _get_application_default_credential_from_file
from oauth2client import crypt
from oauth2client import service_account
import pytz
def get_credentials():
"""Gets credentials implicitly from the current environment.
.. note::
You should not need to use this function directly. Instead, use the
helper method :func:`gcloud.datastore.__init__.get_connection`
which uses this method under the hood.
Checks environment in order of precedence:
* Google App Engine (production and testing)
* Environment variable GOOGLE_APPLICATION_CREDENTIALS pointing to
a file with stored credentials information.
* Stored "well known" file associated with ``gcloud`` command line tool.
* Google Compute Engine production environment.
The file referred to in GOOGLE_APPLICATION_CREDENTIALS is expected to
contain information about credentials that are ready to use. This means
either service account information or user account information with
a ready-to-use refresh token::
{ {
'type': 'authorized_user', 'type': 'service_account',
'client_id': '...', 'client_id': '...',
'client_secret': '...', OR 'client_email': '...',
'refresh_token': '..., 'private_key_id': '...',
} 'private_key': '...',
}
The second of these is simply a JSON key downloaded from the Google APIs
console. The first is a close cousin of the "client secrets" JSON file
used by ``oauth2client.clientsecrets`` but differs in formatting.
:rtype: :class:`oauth2client.client.GoogleCredentials`,
:class:`oauth2client.appengine.AppAssertionCredentials`,
:class:`oauth2client.gce.AppAssertionCredentials`,
:class:`oauth2client.service_account._ServiceAccountCredentials`
:returns: A new credentials instance corresponding to the implicit
environment.
"""
return client.GoogleCredentials.get_application_default()
def get_for_service_account_json(json_credentials_path, scope=None):
"""Gets the credentials for a service account with JSON key.
:type json_credentials_path: string
:param json_credentials_path: The path to a private key file (this file was
given to you when you created the service
account). This file must contain a JSON
object with a private key and other
credentials information (downloaded from the
Google APIs console).
:type scope: string or tuple of string
:param scope: The scope against which to authenticate. (Different services
require different scopes, check the documentation for which
scope is required for the different levels of access to any
particular API.)
:rtype: :class:`oauth2client.client.GoogleCredentials`,
:class:`oauth2client.service_account._ServiceAccountCredentials`
:returns: New service account or Google (for a user JSON key file)
credentials object.
"""
credentials = _get_application_default_credential_from_file(
json_credentials_path)
if scope is not None:
credentials = credentials.create_scoped(scope)
return credentials
def get_for_service_account_p12(client_email, private_key_path, scope=None):
"""Gets the credentials for a service account with PKCS12 / p12 key.
.. note::
This method is not used by default, instead :func:`get_credentials`
is used. This method is intended to be used when the environments is
known explicitly and detecting the environment implicitly would be
superfluous.
:type client_email: string
:param client_email: The e-mail attached to the service account.
:type private_key_path: string
:param private_key_path: The path to a private key file (this file was
given to you when you created the service
account). This file must be in P12 format.
:type scope: string or tuple of string
:param scope: The scope against which to authenticate. (Different services
require different scopes, check the documentation for which
scope is required for the different levels of access to any
particular API.)
:rtype: :class:`oauth2client.client.SignedJwtAssertionCredentials`
:returns: A new ``SignedJwtAssertionCredentials`` instance with the
needed service account settings.
"""
return client.SignedJwtAssertionCredentials(
service_account_name=client_email,
private_key=open(private_key_path, 'rb').read(),
scope=scope)
def _get_pem_key(credentials):
"""Gets RSA key for a PEM payload from a credentials object.
:type credentials: :class:`client.SignedJwtAssertionCredentials`,
:class:`service_account._ServiceAccountCredentials`
:param credentials: The credentials used to create an RSA key
for signing text.
:rtype: :class:`Crypto.PublicKey.RSA._RSAobj`
:returns: An RSA object used to sign text.
:raises: `TypeError` if `credentials` is the wrong type.
"""
if isinstance(credentials, client.SignedJwtAssertionCredentials):
# Take our PKCS12 (.p12) key and make it into a RSA key we can use.
pem_text = crypt.pkcs12_key_as_pem(credentials.private_key,
credentials.private_key_password)
elif isinstance(credentials, service_account._ServiceAccountCredentials):
pem_text = credentials._private_key_pkcs8_text
else:
raise TypeError((credentials,
'not a valid service account credentials type'))
return RSA.importKey(pem_text)
def _get_signed_query_params(credentials, expiration, signature_string):
"""Gets query parameters for creating a signed URL.
:type credentials: :class:`client.SignedJwtAssertionCredentials`,
:class:`service_account._ServiceAccountCredentials`
:param credentials: The credentials used to create an RSA key
for signing text.
:type expiration: int or long
:param expiration: When the signed URL should expire.
:type signature_string: string
:param signature_string: The string to be signed by the credentials.
:rtype: dict
:returns: Query parameters matching the signing credentials with a
signed payload.
"""
pem_key = _get_pem_key(credentials)
# Sign the string with the RSA key.
signer = PKCS1_v1_5.new(pem_key)
if not isinstance(signature_string, six.binary_type):
signature_string = signature_string.encode('utf-8')
signature_hash = SHA256.new(signature_string)
signature_bytes = signer.sign(signature_hash)
signature = base64.b64encode(signature_bytes)
if isinstance(credentials, client.SignedJwtAssertionCredentials):
service_account_name = credentials.service_account_name
elif isinstance(credentials, service_account._ServiceAccountCredentials):
service_account_name = credentials._service_account_email
# We know one of the above must occur since `_get_pem_key` fails if not.
return {
'GoogleAccessId': service_account_name,
'Expires': str(expiration),
'Signature': signature,
}
def _utcnow(): # pragma: NO COVER testing replaces
"""Returns current time as UTC datetime.
NOTE: on the module namespace so tests can replace it.
"""
return datetime.datetime.utcnow()
def _get_expiration_seconds(expiration):
"""Convert 'expiration' to a number of seconds in the future.
:type expiration: int, long, datetime.datetime, datetime.timedelta
:param expiration: When the signed URL should expire.
:rtype: int
:returns: a timestamp as an absolute number of seconds.
"""
# If it's a timedelta, add it to `now` in UTC.
if isinstance(expiration, datetime.timedelta):
now = _utcnow().replace(tzinfo=pytz.utc)
expiration = now + expiration
# If it's a datetime, convert to a timestamp.
if isinstance(expiration, datetime.datetime):
# Make sure the timezone on the value is UTC
# (either by converting or replacing the value).
if expiration.tzinfo:
expiration = expiration.astimezone(pytz.utc)
else:
expiration = expiration.replace(tzinfo=pytz.utc)
# Turn the datetime into a timestamp (seconds, not microseconds).
expiration = int(calendar.timegm(expiration.timetuple()))
if not isinstance(expiration, six.integer_types):
raise TypeError('Expected an integer timestamp, datetime, or '
'timedelta. Got %s' % type(expiration))
return expiration
def generate_signed_url(credentials, resource, expiration,
api_access_endpoint='',
method='GET', content_md5=None,
content_type=None):
"""Generate signed URL to provide query-string auth'n to a resource.
:type credentials: :class:`oauth2client.appengine.AppAssertionCredentials`
:param credentials: Credentials object with an associated private key to
sign text.
:type resource: string
:param resource: A pointer to a specific resource
(typically, ``/bucket-name/path/to/blob.txt``).
:type expiration: int, long, datetime.datetime, datetime.timedelta
:param expiration: When the signed URL should expire.
:type api_access_endpoint: string
:param api_access_endpoint: Optional URI base. Defaults to empty string.
:type method: string
:param method: The HTTP verb that will be used when requesting the URL.
:type content_md5: string
:param content_md5: The MD5 hash of the object referenced by
``resource``.
:type content_type: string
:param content_type: The content type of the object referenced by
``resource``.
:rtype: string
:returns: A signed URL you can use to access the resource
until expiration.
"""
expiration = _get_expiration_seconds(expiration)
# Generate the string to sign.
signature_string = '\n'.join([
method,
content_md5 or '',
content_type or '',
str(expiration),
resource])
# Set the right query parameters.
query_params = _get_signed_query_params(credentials,
expiration,
signature_string)
# Return the built URL.
return '{endpoint}{resource}?{querystring}'.format(
endpoint=api_access_endpoint, resource=resource,
querystring=urlencode(query_params))