1616"""Standalone script for migrating environments from Composer 2 to Composer 3."""
1717
1818import argparse
19- import json
19+ import logging
2020import math
2121import pprint
2222import time
2323from typing import Any , Dict , List
2424
2525import google .auth
2626from google .auth .transport .requests import AuthorizedSession
27- import requests
28-
29- import logging
30-
3127
3228logging .basicConfig (level = logging .DEBUG , format = "%(asctime)s - %(message)s" )
3329logger = logging .getLogger (__name__ )
@@ -77,72 +73,60 @@ def create_environment_from_config(self, config: Any) -> Any:
7773 # The API expects the resource name in the format:
7874 # projects/{project}/locations/{location}/environments/{environment_name}
7975 if "name" not in config :
80- raise ValueError ("Environment name is missing in the config." )
81-
76+ raise ValueError ("Environment name is missing in the config." )
77+
8278 # Extract environment ID from the full name if needed as query param,
8379 # but the original code didn't use it, so we trust the body 'name' field.
8480 # However, usually for Create, we might need environmentId query param if we want to specify it explicitly
8581 # and it's not inferred.
8682 # The original code did: POST .../environments with body.
87-
83+
8884 response = self .session .post (url , json = config )
8985 if response .status_code == 409 :
9086 logger .info ("Environment already exists, skipping creation." )
9187 return
9288
9389 if response .status_code != 200 :
94- raise RuntimeError (
95- f"Failed to create environment: { response .text } "
96- )
97-
90+ raise RuntimeError (f"Failed to create environment: { response .text } " )
91+
9892 operation = response .json ()
9993 logger .info ("Create environment operation: %s" , operation ["name" ])
10094 self ._wait_for_operation (operation ["name" ])
10195
102-
10396 def list_dags (self , environment_name : str ) -> List [Dict [str , Any ]]:
10497 """Returns a list of DAGs in a given Composer environment."""
10598 airflow_uri = self ._get_airflow_uri (environment_name )
106-
99+
107100 url = f"{ airflow_uri } /api/v1/dags"
108101 response = self .session .get (url )
109102 if response .status_code != 200 :
110- raise RuntimeError (
111- f"Failed to list DAGs: { response .text } "
112- )
103+ raise RuntimeError (f"Failed to list DAGs: { response .text } " )
113104 return response .json ()["dags" ]
114105
115-
116106 def pause_dag (
117107 self ,
118108 dag_id : str ,
119109 environment_name : str ,
120110 ) -> Any :
121111 """Pauses a DAG in a Composer environment."""
122112 airflow_uri = self ._get_airflow_uri (environment_name )
123-
113+
124114 url = f"{ airflow_uri } /api/v1/dags/{ dag_id } "
125115 response = self .session .patch (url , json = {"is_paused" : True })
126116 if response .status_code != 200 :
127- raise RuntimeError (
128- f"Failed to pause DAG { dag_id } : { response .text } "
129- )
130-
117+ raise RuntimeError (f"Failed to pause DAG { dag_id } : { response .text } " )
131118
132119 def pause_all_dags (
133120 self ,
134121 environment_name : str ,
135122 ) -> Any :
136123 """Pauses all DAGs in a Composer environment."""
137124 airflow_uri = self ._get_airflow_uri (environment_name )
138-
139- url = f"{ airflow_uri } /api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
125+
126+ url = f"{ airflow_uri } /api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
140127 response = self .session .patch (url , json = {"is_paused" : True })
141128 if response .status_code != 200 :
142- raise RuntimeError (
143- f"Failed to pause all DAGs: { response .text } "
144- )
145-
129+ raise RuntimeError (f"Failed to pause all DAGs: { response .text } " )
146130
147131 def unpause_dag (
148132 self ,
@@ -151,27 +135,23 @@ def unpause_dag(
151135 ) -> Any :
152136 """Unpauses a DAG in a Composer environment."""
153137 airflow_uri = self ._get_airflow_uri (environment_name )
154-
138+
155139 url = f"{ airflow_uri } /api/v1/dags/{ dag_id } "
156140 response = self .session .patch (url , json = {"is_paused" : False })
157141 if response .status_code != 200 :
158- raise RuntimeError (
159- f"Failed to unpause DAG { dag_id } : { response .text } "
160- )
142+ raise RuntimeError (f"Failed to unpause DAG { dag_id } : { response .text } " )
161143
162144 def unpause_all_dags (
163145 self ,
164146 environment_name : str ,
165147 ) -> Any :
166148 """Unpauses all DAGs in a Composer environment."""
167149 airflow_uri = self ._get_airflow_uri (environment_name )
168-
169- url = f"{ airflow_uri } /api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
150+
151+ url = f"{ airflow_uri } /api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
170152 response = self .session .patch (url , json = {"is_paused" : False })
171153 if response .status_code != 200 :
172- raise RuntimeError (
173- f"Failed to unpause all DAGs: { response .text } "
174- )
154+ raise RuntimeError (f"Failed to unpause all DAGs: { response .text } " )
175155
176156 def save_snapshot (self , environment_name : str ) -> str :
177157 """Saves a snapshot of a Composer environment."""
@@ -181,10 +161,8 @@ def save_snapshot(self, environment_name: str) -> str:
181161 )
182162 response = self .session .post (url , json = {})
183163 if response .status_code != 200 :
184- raise RuntimeError (
185- f"Failed to initiate snapshot save: { response .text } "
186- )
187-
164+ raise RuntimeError (f"Failed to initiate snapshot save: { response .text } " )
165+
188166 operation = response .json ()
189167 logging .info ("Save snapshot operation: %s" , operation ["name" ])
190168 completed_operation = self ._wait_for_operation (operation ["name" ])
@@ -202,36 +180,31 @@ def load_snapshot(
202180 )
203181 response = self .session .post (url , json = {"snapshotPath" : snapshot_path })
204182 if response .status_code != 200 :
205- raise RuntimeError (
206- f"Failed to initiate snapshot load: { response .text } "
207- )
183+ raise RuntimeError (f"Failed to initiate snapshot load: { response .text } " )
208184
209185 operation = response .json ()
210186 logging .info ("Load snapshot operation: %s" , operation ["name" ])
211187 self ._wait_for_operation (operation ["name" ])
212188
213-
214189 def _wait_for_operation (self , operation_name : str ) -> Any :
215190 """Waits for a long-running operation to complete."""
216191 # operation_name is distinct from operation_id.
217192 # It is a full resource name: projects/.../locations/.../operations/...
218-
193+
219194 # We need to poll the operation status.
220195 url = f"{ self .sdk_endpoint } /v1/{ operation_name } "
221-
196+
222197 while True :
223198 response = self .session .get (url )
224199 if response .status_code != 200 :
225- raise RuntimeError (
226- f"Failed to get operation status: { response .text } "
227- )
200+ raise RuntimeError (f"Failed to get operation status: { response .text } " )
228201 operation = response .json ()
229202 if "done" in operation and operation ["done" ]:
230203 if "error" in operation :
231204 raise RuntimeError (f"Operation failed: { operation ['error' ]} " )
232205 logging .info ("Operation completed successfully." )
233206 return operation
234-
207+
235208 logging .info ("Waiting for operation to complete..." )
236209 time .sleep (10 )
237210
@@ -416,7 +389,6 @@ def main(
416389 sdk_endpoint : str ,
417390 dry_run : bool ,
418391) -> int :
419-
420392 client = ComposerClient (
421393 project = project_name , location = location , sdk_endpoint = sdk_endpoint
422394 )
@@ -507,7 +479,9 @@ def main(
507479 else :
508480 for dag in source_env_dags :
509481 if dag ["is_paused" ]:
510- logger .info ("DAG %s was paused in the source environment." , dag ["dag_id" ])
482+ logger .info (
483+ "DAG %s was paused in the source environment." , dag ["dag_id" ]
484+ )
511485 continue
512486 logger .info ("Unpausing DAG %s in the target environment." , dag ["dag_id" ])
513487 client .unpause_dag (dag ["dag_id" ], target_environment_name )
@@ -559,8 +533,7 @@ def parse_arguments() -> Dict[Any, Any]:
559533 action = "store_true" ,
560534 default = False ,
561535 help = (
562- "If true, script will only print the config for the Composer 3"
563- " environment."
536+ "If true, script will only print the config for the Composer 3 environment."
564537 ),
565538 )
566539 argument_parser .add_argument (
0 commit comments