forked from GoogleCloudPlatform/python-docs-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvm_identity.py
More file actions
107 lines (91 loc) · 4.06 KB
/
vm_identity.py
File metadata and controls
107 lines (91 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Example of verifying Google Compute Engine virtual machine identity.
This sample will work only on a GCE virtual machine, as it relies on
communication with metadata server
(https://cloud.google.com/compute/docs/storing-retrieving-metadata).
Example is used on:
https://cloud.google.com/compute/docs/instances/verifying-instance-identity
"""
import pprint
# [START compute_vm_identity_verify_token]
import google.auth.transport.requests
from google.oauth2 import id_token
# [END compute_vm_identity_verify_token]
# [START compute_vm_identity_acquire_token]
import requests
AUDIENCE_URL = "http://www.example.com"
METADATA_HEADERS = {"Metadata-Flavor": "Google"}
METADATA_VM_IDENTITY_URL = (
"http://metadata.google.internal/computeMetadata/v1/"
"instance/service-accounts/default/identity?"
"audience={audience}&format={format}&licenses={licenses}"
)
FORMAT = "full"
LICENSES = "TRUE"
def acquire_token(
audience: str = AUDIENCE_URL, format: str = "standard", licenses: bool = True
) -> str:
"""
Requests identity information from the metadata server.
Args:
audience: the unique URI agreed upon by both the instance and the
system verifying the instance's identity. For example, the audience
could be a URL for the connection between the two systems.
format: the optional parameter that specifies whether the project and
instance details are included in the payload. Specify `full` to
include this information in the payload or standard to omit the
information from the payload. The default value is `standard`.
licenses: an optional parameter that specifies whether license
codes for images associated with this instance are included in the
payload. Specify TRUE to include this information or FALSE to omit
this information from the payload. The default value is FALSE.
Has no effect unless format is `full`.
Returns:
A JSON Web Token signed using the RS256 algorithm. The token includes a
Google signature and additional information in the payload. You can send
this token to other systems and applications so that they can verify the
token and confirm that the identity of your instance.
"""
# Construct a URL with the audience and format.
url = METADATA_VM_IDENTITY_URL.format(
audience=audience, format=format, licenses=licenses
)
# Request a token from the metadata server.
r = requests.get(url, headers=METADATA_HEADERS)
# Extract and return the token from the response.
r.raise_for_status()
return r.text
# [END compute_vm_identity_acquire_token]
# [START compute_vm_identity_verify_token]
def verify_token(token: str, audience: str) -> dict:
"""Verify token signature and return the token payload.
Args:
token: the JSON Web Token received from the metadata server to
be verified.
audience: the unique URI agreed upon by both the instance and the
system verifying the instance's identity.
Returns:
Dictionary containing the token payload.
"""
request = google.auth.transport.requests.Request()
payload = id_token.verify_token(token, request=request, audience=audience)
return payload
# [END compute_vm_identity_verify_token]
if __name__ == "__main__":
token_ = acquire_token(AUDIENCE_URL)
print("Received token:", token_)
print("Token verification:")
pprint.pprint(verify_token(token_, AUDIENCE_URL))