@@ -124,15 +124,15 @@ def pause_all_dags(
124124 self ,
125125 environment_name : str ,
126126 ) -> Any :
127- """Pauses a DAG in a Composer environment."""
127+ """Pauses all DAGs in a Composer environment."""
128128 environment = self .get_environment (environment_name )
129129 airflow_uri = environment ["config" ]["airflowUri" ]
130130
131131 url = f"{ airflow_uri } /api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
132132 response = self .session .patch (url , json = {"is_paused" : True })
133133 if response .status_code != 200 :
134134 raise RuntimeError (
135- f"Failed to pause DAG { dag_id } : { response .text } "
135+ f"Failed to pause all DAGs : { response .text } "
136136 )
137137
138138
@@ -156,15 +156,15 @@ def unpause_all_dags(
156156 self ,
157157 environment_name : str ,
158158 ) -> Any :
159- """Pauses a DAG in a Composer environment."""
159+ """Unpauses all DAGs in a Composer environment."""
160160 environment = self .get_environment (environment_name )
161161 airflow_uri = environment ["config" ]["airflowUri" ]
162162
163163 url = f"{ airflow_uri } /api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
164164 response = self .session .patch (url , json = {"is_paused" : False })
165165 if response .status_code != 200 :
166166 raise RuntimeError (
167- f"Failed to pause DAG { dag_id } : { response .text } "
167+ f"Failed to unpause all DAGs : { response .text } "
168168 )
169169
170170 def save_snapshot (self , environment_name : str ) -> str :
@@ -489,15 +489,18 @@ def main(
489489 all_dags_present = set (source_env_dag_ids ) == set (target_env_dag_ids )
490490 logger .info ("List of DAGs in the target environment: %s" , target_env_dag_ids )
491491 # Unpause only DAGs that were not paused in the source environment.
492- for dag in source_env_dags :
493- if dag ["dag_id" ] == "airflow_monitoring" :
494- continue
495- if dag ["is_paused" ]:
496- logger .info ("DAG %s was paused in the source environment." , dag ["dag_id" ])
497- continue
498- logger .info ("Unpausing DAG %s in the target environment." , dag ["dag_id" ])
499- client .unpause_dag (dag ["dag_id" ], target_environment_name )
500- logger .info ("DAG %s unpaused." , dag ["dag_id" ])
492+ # Optimization: if all DAGs were unpaused in source, use bulk unpause.
493+ if not any (d ["is_paused" ] for d in source_env_dags ):
494+ logger .info ("All DAGs were unpaused in source. Unpausing all DAGs in target." )
495+ client .unpause_all_dags (target_environment_name )
496+ else :
497+ for dag in source_env_dags :
498+ if dag ["is_paused" ]:
499+ logger .info ("DAG %s was paused in the source environment." , dag ["dag_id" ])
500+ continue
501+ logger .info ("Unpausing DAG %s in the target environment." , dag ["dag_id" ])
502+ client .unpause_dag (dag ["dag_id" ], target_environment_name )
503+ logger .info ("DAG %s unpaused." , dag ["dag_id" ])
501504 logger .info ("DAGs in the target environment unpaused." )
502505
503506 logger .info ("Migration complete." )
0 commit comments