Skip to content

Commit 73ac8c5

Browse files
author
ddeleo
committed
feat: modify pause_dag and unpause_dag to operate on all DAGs and optimize restore_dags_state to use bulk unpause when possible.
1 parent fc2be09 commit 73ac8c5

1 file changed

Lines changed: 16 additions & 13 deletions

File tree

composer/tools/composer_migrate.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,15 @@ def pause_all_dags(
124124
self,
125125
environment_name: str,
126126
) -> Any:
127-
"""Pauses a DAG in a Composer environment."""
127+
"""Pauses all DAGs in a Composer environment."""
128128
environment = self.get_environment(environment_name)
129129
airflow_uri = environment["config"]["airflowUri"]
130130

131131
url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
132132
response = self.session.patch(url, json={"is_paused": True})
133133
if response.status_code != 200:
134134
raise RuntimeError(
135-
f"Failed to pause DAG {dag_id}: {response.text}"
135+
f"Failed to pause all DAGs: {response.text}"
136136
)
137137

138138

@@ -156,15 +156,15 @@ def unpause_all_dags(
156156
self,
157157
environment_name: str,
158158
) -> Any:
159-
"""Pauses a DAG in a Composer environment."""
159+
"""Unpauses all DAGs in a Composer environment."""
160160
environment = self.get_environment(environment_name)
161161
airflow_uri = environment["config"]["airflowUri"]
162162

163163
url = f"{airflow_uri}/api/v1/dags?dag_id_pattern=%" # Pause all DAGs using % as a wildcard
164164
response = self.session.patch(url, json={"is_paused": False})
165165
if response.status_code != 200:
166166
raise RuntimeError(
167-
f"Failed to pause DAG {dag_id}: {response.text}"
167+
f"Failed to unpause all DAGs: {response.text}"
168168
)
169169

170170
def save_snapshot(self, environment_name: str) -> str:
@@ -489,15 +489,18 @@ def main(
489489
all_dags_present = set(source_env_dag_ids) == set(target_env_dag_ids)
490490
logger.info("List of DAGs in the target environment: %s", target_env_dag_ids)
491491
# Unpause only DAGs that were not paused in the source environment.
492-
for dag in source_env_dags:
493-
if dag["dag_id"] == "airflow_monitoring":
494-
continue
495-
if dag["is_paused"]:
496-
logger.info("DAG %s was paused in the source environment.", dag["dag_id"])
497-
continue
498-
logger.info("Unpausing DAG %s in the target environment.", dag["dag_id"])
499-
client.unpause_dag(dag["dag_id"], target_environment_name)
500-
logger.info("DAG %s unpaused.", dag["dag_id"])
492+
# Optimization: if all DAGs were unpaused in source, use bulk unpause.
493+
if not any(d["is_paused"] for d in source_env_dags):
494+
logger.info("All DAGs were unpaused in source. Unpausing all DAGs in target.")
495+
client.unpause_all_dags(target_environment_name)
496+
else:
497+
for dag in source_env_dags:
498+
if dag["is_paused"]:
499+
logger.info("DAG %s was paused in the source environment.", dag["dag_id"])
500+
continue
501+
logger.info("Unpausing DAG %s in the target environment.", dag["dag_id"])
502+
client.unpause_dag(dag["dag_id"], target_environment_name)
503+
logger.info("DAG %s unpaused.", dag["dag_id"])
501504
logger.info("DAGs in the target environment unpaused.")
502505

503506
logger.info("Migration complete.")

0 commit comments

Comments
 (0)