Skip to content

Commit 1ee976a

Browse files
author
ddeleo
committed
lint/format using ruff
1 parent 567789c commit 1ee976a

File tree

1 file changed

+29
-56
lines changed

1 file changed

+29
-56
lines changed

composer/tools/composer_migrate.py

Lines changed: 29 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,14 @@
1616
"""Standalone script for migrating environments from Composer 2 to Composer 3."""
1717

1818
import argparse
19-
import json
19+
import logging
2020
import math
2121
import pprint
2222
import time
2323
from typing import Any, Dict, List
2424

2525
import google.auth
2626
from google.auth.transport.requests import AuthorizedSession
27-
import requests
28-
29-
import logging
30-
3127

3228
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(message)s")
3329
logger = logging.getLogger(__name__)
@@ -77,72 +73,60 @@ def create_environment_from_config(self, config: Any) -> Any:
7773
# The API expects the resource name in the format:
7874
# projects/{project}/locations/{location}/environments/{environment_name}
7975
if "name" not in config:
80-
raise ValueError("Environment name is missing in the config.")
81-
76+
raise ValueError("Environment name is missing in the config.")
77+
8278
# Extract environment ID from the full name if needed as query param,
8379
# but the original code didn't use it, so we trust the body 'name' field.
8480
# However, usually for Create, we might need environmentId query param if we want to specify it explicitly
8581
# and it's not inferred.
8682
# The original code did: POST .../environments with body.
87-
83+
8884
response = self.session.post(url, json=config)
8985
if response.status_code == 409:
9086
logger.info("Environment already exists, skipping creation.")
9187
return
9288

9389
if response.status_code != 200:
94-
raise RuntimeError(
95-
f"Failed to create environment: {response.text}"
96-
)
97-
90+
raise RuntimeError(f"Failed to create environment: {response.text}")
91+
9892
operation = response.json()
9993
logger.info("Create environment operation: %s", operation["name"])
10094
self._wait_for_operation(operation["name"])
10195

102-
10396
def list_dags(self, environment_name: str) -> List[Dict[str, Any]]:
10497
"""Returns a list of DAGs in a given Composer environment."""
10598
airflow_uri = self._get_airflow_uri(environment_name)
106-
99+
107100
url = f"{airflow_uri}/api/v1/dags"
108101
response = self.session.get(url)
109102
if response.status_code != 200:
110-
raise RuntimeError(
111-
f"Failed to list DAGs: {response.text}"
112-
)
103+
raise RuntimeError(f"Failed to list DAGs: {response.text}")
113104
return response.json()["dags"]
114105

115-
116106
def pause_dag(
117107
self,
118108
dag_id: str,
119109
environment_name: str,
120110
) -> Any:
121111
"""Pauses a DAG in a Composer environment."""
122112
airflow_uri = self._get_airflow_uri(environment_name)
123-
113+
124114
url = f"{airflow_uri}/api/v1/dags/{dag_id}"
125115
response = self.session.patch(url, json={"is_paused": True})
126116
if response.status_code != 200:
127-
raise RuntimeError(
128-
f"Failed to pause DAG {dag_id}: {response.text}"
129-
)
130-
117+
raise RuntimeError(f"Failed to pause DAG {dag_id}: {response.text}")
131118

132119
def pause_all_dags(
133120
self,
134121
environment_name: str,
135122
) -> Any:
136123
"""Pauses all DAGs in a Composer environment."""
137124
airflow_uri = self._get_airflow_uri(environment_name)
138-
139-
url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
125+
126+
url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
140127
response = self.session.patch(url, json={"is_paused": True})
141128
if response.status_code != 200:
142-
raise RuntimeError(
143-
f"Failed to pause all DAGs: {response.text}"
144-
)
145-
129+
raise RuntimeError(f"Failed to pause all DAGs: {response.text}")
146130

147131
def unpause_dag(
148132
self,
@@ -151,27 +135,23 @@ def unpause_dag(
151135
) -> Any:
152136
"""Unpauses a DAG in a Composer environment."""
153137
airflow_uri = self._get_airflow_uri(environment_name)
154-
138+
155139
url = f"{airflow_uri}/api/v1/dags/{dag_id}"
156140
response = self.session.patch(url, json={"is_paused": False})
157141
if response.status_code != 200:
158-
raise RuntimeError(
159-
f"Failed to unpause DAG {dag_id}: {response.text}"
160-
)
142+
raise RuntimeError(f"Failed to unpause DAG {dag_id}: {response.text}")
161143

162144
def unpause_all_dags(
163145
self,
164146
environment_name: str,
165147
) -> Any:
166148
"""Unpauses all DAGs in a Composer environment."""
167149
airflow_uri = self._get_airflow_uri(environment_name)
168-
169-
url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
150+
151+
url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
170152
response = self.session.patch(url, json={"is_paused": False})
171153
if response.status_code != 200:
172-
raise RuntimeError(
173-
f"Failed to unpause all DAGs: {response.text}"
174-
)
154+
raise RuntimeError(f"Failed to unpause all DAGs: {response.text}")
175155

176156
def save_snapshot(self, environment_name: str) -> str:
177157
"""Saves a snapshot of a Composer environment."""
@@ -181,10 +161,8 @@ def save_snapshot(self, environment_name: str) -> str:
181161
)
182162
response = self.session.post(url, json={})
183163
if response.status_code != 200:
184-
raise RuntimeError(
185-
f"Failed to initiate snapshot save: {response.text}"
186-
)
187-
164+
raise RuntimeError(f"Failed to initiate snapshot save: {response.text}")
165+
188166
operation = response.json()
189167
logging.info("Save snapshot operation: %s", operation["name"])
190168
completed_operation = self._wait_for_operation(operation["name"])
@@ -202,36 +180,31 @@ def load_snapshot(
202180
)
203181
response = self.session.post(url, json={"snapshotPath": snapshot_path})
204182
if response.status_code != 200:
205-
raise RuntimeError(
206-
f"Failed to initiate snapshot load: {response.text}"
207-
)
183+
raise RuntimeError(f"Failed to initiate snapshot load: {response.text}")
208184

209185
operation = response.json()
210186
logging.info("Load snapshot operation: %s", operation["name"])
211187
self._wait_for_operation(operation["name"])
212188

213-
214189
def _wait_for_operation(self, operation_name: str) -> Any:
215190
"""Waits for a long-running operation to complete."""
216191
# operation_name is distinct from operation_id.
217192
# It is a full resource name: projects/.../locations/.../operations/...
218-
193+
219194
# We need to poll the operation status.
220195
url = f"{self.sdk_endpoint}/v1/{operation_name}"
221-
196+
222197
while True:
223198
response = self.session.get(url)
224199
if response.status_code != 200:
225-
raise RuntimeError(
226-
f"Failed to get operation status: {response.text}"
227-
)
200+
raise RuntimeError(f"Failed to get operation status: {response.text}")
228201
operation = response.json()
229202
if "done" in operation and operation["done"]:
230203
if "error" in operation:
231204
raise RuntimeError(f"Operation failed: {operation['error']}")
232205
logging.info("Operation completed successfully.")
233206
return operation
234-
207+
235208
logging.info("Waiting for operation to complete...")
236209
time.sleep(10)
237210

@@ -416,7 +389,6 @@ def main(
416389
sdk_endpoint: str,
417390
dry_run: bool,
418391
) -> int:
419-
420392
client = ComposerClient(
421393
project=project_name, location=location, sdk_endpoint=sdk_endpoint
422394
)
@@ -507,7 +479,9 @@ def main(
507479
else:
508480
for dag in source_env_dags:
509481
if dag["is_paused"]:
510-
logger.info("DAG %s was paused in the source environment.", dag["dag_id"])
482+
logger.info(
483+
"DAG %s was paused in the source environment.", dag["dag_id"]
484+
)
511485
continue
512486
logger.info("Unpausing DAG %s in the target environment.", dag["dag_id"])
513487
client.unpause_dag(dag["dag_id"], target_environment_name)
@@ -559,8 +533,7 @@ def parse_arguments() -> Dict[Any, Any]:
559533
action="store_true",
560534
default=False,
561535
help=(
562-
"If true, script will only print the config for the Composer 3"
563-
" environment."
536+
"If true, script will only print the config for the Composer 3 environment."
564537
),
565538
)
566539
argument_parser.add_argument(

0 commit comments

Comments
 (0)