-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathorchestrator.py
More file actions
115 lines (103 loc) · 3.89 KB
/
orchestrator.py
File metadata and controls
115 lines (103 loc) · 3.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Top-level ``clone()`` entry point.
Wires source/target ``PlatformClient`` instances, builds a
``CloneContext``, runs each phase in strict topological order, and
returns a ``CloneReport``.
Phase order is owned here — phases must not call each other. Adding a new
entity type means: write a new ``Phase`` subclass and append it to
``PHASES`` at the right dependency position.
"""
from __future__ import annotations
import logging
import time
from unstract.clone.client import PlatformClient
from unstract.clone.context import CloneContext, CloneOptions, OrgEndpoint
from unstract.clone.exceptions import CloneError
from unstract.clone.phases import (
AdapterPhase,
APIDeploymentPhase,
ConnectorPhase,
CustomToolPhase,
FilesPhase,
PipelinePhase,
TagPhase,
ToolInstancePhase,
WorkflowEndpointPhase,
WorkflowPhase,
)
from unstract.clone.phases.base import Phase
from unstract.clone.report import CloneReport, Endpoint
logger = logging.getLogger(__name__)
# Strict dependency order. Each entry: (phase_name, phase_class).
# Adapter, connector, tag are independent leaf phases. Downstream phases
# (custom_tool, workflow, tool_instance, workflow_endpoint) land later
# and consume the remap entries these produce. Pipeline + api_deployment
# come last: both FK the workflow and api_deployment additionally
# requires endpoints to be configured before the serializer accepts it.
PHASES: list[tuple[str, type[Phase]]] = [
("adapter", AdapterPhase),
("connector", ConnectorPhase),
("tag", TagPhase),
("custom_tool", CustomToolPhase),
("files", FilesPhase),
("workflow", WorkflowPhase),
("tool_instance", ToolInstancePhase),
("workflow_endpoint", WorkflowEndpointPhase),
("pipeline", PipelinePhase),
("api_deployment", APIDeploymentPhase),
]
def clone(
source: OrgEndpoint,
target: OrgEndpoint,
options: CloneOptions | None = None,
) -> CloneReport:
"""Migrate configured resources from one org to another.
Returns a ``CloneReport`` even on partial failure; raises only on
setup errors or ``on_name_conflict='abort'`` collisions.
"""
opts = options or CloneOptions()
src_client = PlatformClient(source)
tgt_client = PlatformClient(target)
try:
ctx = CloneContext(
source=src_client,
target=tgt_client,
options=opts,
)
report = CloneReport(
source=Endpoint(
base_url=source.base_url, organization_id=source.organization_id
),
target=Endpoint(
base_url=target.base_url, organization_id=target.organization_id
),
)
run_started = time.perf_counter()
for name, phase_cls in PHASES:
if not opts.includes(name):
report.skipped_phases.append(name)
logger.info("Phase '%s' skipped (excluded)", name)
continue
logger.info("=== Phase: %s ===", name)
phase_started = time.perf_counter()
try:
phase_cls(ctx).run(report)
except CloneError as e:
report.aborted = True
report.abort_reason = str(e)
logger.error("Phase '%s' aborted: %s", name, e)
# Stamp duration even on abort so the report reflects time spent.
report.get_phase(name).duration_s = time.perf_counter() - phase_started
break
else:
report.get_phase(name).duration_s = time.perf_counter() - phase_started
logger.info(
"=== Phase '%s' done in %.2fs ===",
name,
report.get_phase(name).duration_s,
)
report.total_duration_s = time.perf_counter() - run_started
report.remap_snapshot = ctx.remap.snapshot()
return report
finally:
src_client.close()
tgt_client.close()