|
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | | - |
15 | 14 | """System tests for VideoIntelligence API.""" |
16 | 15 |
|
17 | 16 | import json |
18 | 17 | import os |
19 | 18 | import requests |
20 | | -import time |
21 | 19 | import unittest |
22 | 20 |
|
| 21 | +from google.auth.transport import requests as goog_auth_requests |
23 | 22 | from google.cloud import videointelligence |
24 | | -from google.cloud.videointelligence_v1 import enums |
| 23 | +from google.oauth2 import service_account |
| 24 | + |
| 25 | +CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform" |
| 26 | +CREDENTIALS_FILE = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") |
| 27 | +OUTSIDE_BUCKET = os.environ.get("GOOGLE_CLOUD_TESTS_VPCSC_OUTSIDE_PERIMETER_BUCKET") |
| 28 | +INSIDE_BUCKET = os.environ.get("GOOGLE_CLOUD_TESTS_VPCSC_INSIDE_PERIMETER_BUCKET") |
| 29 | +IS_INSIDE_VPCSC = os.environ.get("GOOGLE_CLOUD_TESTS_IN_VPCSC") |
25 | 30 |
|
26 | | -PROJECT_NUMBER = os.environ.get("PROJECT_NUMBER") |
27 | | -OUTSIDE_PROJECT_API_KEY = os.environ.get( |
28 | | - "GOOGLE_CLOUD_TESTS_VPCSC_OUTSIDE_PERIMETER_PROJECT_API_KEY" |
29 | | -) |
30 | | -OUTSIDE_IP = os.environ.get("GOOGLE_CLOUD_TESTS_VPCSC_OUTSIDE_IP") |
31 | | -INSIDE_IP = os.environ.get("GOOGLE_CLOUD_TESTS_VPCSC_INSIDE_IP") |
| 31 | + |
| 32 | +def get_access_token(): |
| 33 | + """Returns an access token. |
| 34 | +
|
| 35 | + Generates access tokens using the provided service account key file. |
| 36 | + """ |
| 37 | + creds = service_account.Credentials.from_service_account_file( |
| 38 | + CREDENTIALS_FILE, scopes=[CLOUD_PLATFORM_SCOPE] |
| 39 | + ) |
| 40 | + with requests.Session() as session: |
| 41 | + creds.refresh(goog_auth_requests.Request(session=session)) |
| 42 | + return creds.token |
32 | 43 |
|
33 | 44 |
|
34 | 45 | class VideoIntelligenceSystemTestBase(unittest.TestCase): |
35 | 46 | client = None |
36 | 47 |
|
37 | | - def setUp(self): |
38 | | - self.input_uri = "gs://cloud-samples-data/video/cat.mp4" |
39 | | - |
40 | 48 |
|
41 | 49 | def setUpModule(): |
42 | 50 | VideoIntelligenceSystemTestBase.client = ( |
43 | 51 | videointelligence.VideoIntelligenceServiceClient() |
44 | 52 | ) |
45 | 53 |
|
46 | 54 |
|
47 | | -class TestVideoIntelligenceClient(VideoIntelligenceSystemTestBase): |
48 | | - def test_annotate_video(self): |
49 | | - features_element = enums.Feature.LABEL_DETECTION |
50 | | - features = [features_element] |
51 | | - response = self.client.annotate_video( |
52 | | - input_uri=self.input_uri, features=features |
53 | | - ) |
54 | | - |
55 | | - # Wait for the operation to complete. |
56 | | - # Long timeout value warranted due to https://github.com/grpc/grpc/issues/19173 |
57 | | - lro_timeout_seconds = 180 |
58 | | - start_time = time.time() |
59 | | - cnt = 0 |
60 | | - while not response.done() and (time.time() - start_time) < lro_timeout_seconds: |
61 | | - time.sleep(1) |
62 | | - cnt += 1 |
63 | | - if not response.done(): |
64 | | - self.fail( |
65 | | - "wait for operation timed out after {lro_timeout_seconds} seconds".format( |
66 | | - lro_timeout_seconds=lro_timeout_seconds |
67 | | - ) |
68 | | - ) |
69 | | - |
70 | | - result = response.result() |
71 | | - annotations = result.annotation_results[0] |
72 | | - assert len(annotations.segment_label_annotations) > 0 |
73 | | - |
74 | | - |
75 | 55 | @unittest.skipUnless( |
76 | | - OUTSIDE_PROJECT_API_KEY, |
77 | | - "GOOGLE_CLOUD_TESTS_VPCSC_OUTSIDE_PERIMETER_PROJECT_API_KEY not set in environment.", |
| 56 | + CREDENTIALS_FILE, "GOOGLE_APPLICATION_CREDENTIALS not set in environment." |
78 | 57 | ) |
79 | 58 | class TestVideoIntelligenceClientVpcSc(VideoIntelligenceSystemTestBase): |
80 | | - # Tests to verify VideoIntelligence service requests blocked when trying to access resources outside of a secure perimeter. |
| 59 | + # Tests to verify VideoIntelligence service requests blocked when trying to |
| 60 | + # access resources outside of a secure perimeter. |
81 | 61 | def setUp(self): |
82 | 62 | VideoIntelligenceSystemTestBase.setUp(self) |
83 | 63 | # api-endpoint |
84 | | - self.url = "https://videointelligence.googleapis.com/v1/videos:annotate?key={}".format( |
85 | | - OUTSIDE_PROJECT_API_KEY |
86 | | - ) |
87 | | - self.body = { |
88 | | - "input_uri": self.input_uri, |
89 | | - "features": ["LABEL_DETECTION"], |
90 | | - "location_id": "us-west1", |
91 | | - } |
| 64 | + self.url = "https://videointelligence.googleapis.com/v1/videos:annotate" |
| 65 | + self.body = {"features": ["LABEL_DETECTION"], "location_id": "us-west1"} |
92 | 66 |
|
93 | | - @unittest.skipUnless(PROJECT_NUMBER, "PROJECT_NUMBER not set in environment.") |
94 | 67 | @unittest.skipUnless( |
95 | | - OUTSIDE_IP, "GOOGLE_CLOUD_TESTS_VPCSC_OUTSIDE_IP not set in environment." |
| 68 | + OUTSIDE_BUCKET, |
| 69 | + "GOOGLE_CLOUD_TESTS_VPCSC_OUTSIDE_PERIMETER_BUCKET not set in environment.", |
96 | 70 | ) |
97 | | - def test_outside_ip_address_blocked(self): |
| 71 | + @unittest.skipUnless( |
| 72 | + IS_INSIDE_VPCSC, "GOOGLE_CLOUD_TESTS_IN_VPCSC not set in environment." |
| 73 | + ) |
| 74 | + def test_outside_perimeter_blocked(self): |
98 | 75 | headers = { |
| 76 | + "Authorization": "Bearer " + get_access_token(), |
99 | 77 | "Content-Type": "application/json", |
100 | | - "X-User-IP": OUTSIDE_IP, |
101 | | - "X-Google-GFE-Cloud-Client-Network-Project-Number": PROJECT_NUMBER, |
102 | 78 | } |
| 79 | + self.body["input_uri"] = "gs://{bucket}/cat.mp4".format(bucket=OUTSIDE_BUCKET) |
103 | 80 | r = requests.post(url=self.url, data=json.dumps(self.body), headers=headers) |
104 | | - outside_project_operation = json.loads(r.text) |
105 | | - print(outside_project_operation) |
| 81 | + resp = json.loads(r.text) |
| 82 | + print(resp) |
106 | 83 | # Assert it returns permission denied from VPC SC |
107 | | - self.assertEqual(outside_project_operation["error"]["code"], 403) |
108 | | - self.assertEqual( |
109 | | - outside_project_operation["error"]["status"], "PERMISSION_DENIED" |
110 | | - ) |
111 | | - self.assertEqual( |
112 | | - outside_project_operation["error"]["details"][0]["violations"][0]["type"], |
113 | | - "VPC_SERVICE_CONTROLS", |
114 | | - ) |
115 | | - self.assertEqual( |
116 | | - outside_project_operation["error"]["message"], |
117 | | - "Request is prohibited by organization's policy", |
118 | | - ) |
| 84 | + self.assertEqual(resp["error"]["code"], 403) |
| 85 | + self.assertEqual(resp["error"]["status"], "PERMISSION_DENIED") |
119 | 86 |
|
120 | | - @unittest.skipUnless(PROJECT_NUMBER, "PROJECT_NUMBER not set in environment.") |
121 | 87 | @unittest.skipUnless( |
122 | | - INSIDE_IP, "GOOGLE_CLOUD_TESTS_VPCSC_INSIDE_IP not set in environment." |
| 88 | + INSIDE_BUCKET, |
| 89 | + "GOOGLE_CLOUD_TESTS_VPCSC_INSIDE_PERIMETER_BUCKET not set in environment.", |
| 90 | + ) |
| 91 | + @unittest.skipUnless( |
| 92 | + IS_INSIDE_VPCSC, "GOOGLE_CLOUD_TESTS_IN_VPCSC not set in environment." |
123 | 93 | ) |
124 | | - def test_inside_ip_address_allowed(self): |
| 94 | + def test_inside_perimeter_allowed(self): |
125 | 95 | headers = { |
| 96 | + "Authorization": "Bearer " + get_access_token(), |
126 | 97 | "Content-Type": "application/json", |
127 | | - "X-User-IP": INSIDE_IP, |
128 | | - "X-Google-GFE-Cloud-Client-Network-Project-Number": PROJECT_NUMBER, |
129 | 98 | } |
| 99 | + self.body["input_uri"] = "gs://{bucket}/cat.mp4".format(bucket=INSIDE_BUCKET) |
130 | 100 | r = requests.post(url=self.url, data=json.dumps(self.body), headers=headers) |
131 | 101 | operation = json.loads(r.text) |
132 | | - # Assert it returns non-empty operation name. |
133 | | - self.assertNotEqual(operation["name"], "") |
| 102 | + print(operation) |
| 103 | + |
| 104 | + get_op_url = "https://videointelligence.googleapis.com/v1/" + operation["name"] |
| 105 | + get_op = requests.get(url=get_op_url, headers=headers) |
| 106 | + get_op_resp = json.loads(get_op.text) |
| 107 | + print(get_op_resp) |
| 108 | + # Assert that we do not get an error. |
| 109 | + self.assertEqual(get_op_resp["name"], operation["name"]) |
0 commit comments