-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathworkflow.py
More file actions
160 lines (140 loc) · 5.84 KB
/
workflow.py
File metadata and controls
160 lines (140 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""Migrate workflows from source org to target org.
Workflow rows themselves are simple — no required FKs to clone
entities, unique per ``(workflow_name, organization)``. The two
non-trivial bits:
1. ``source_settings`` and ``destination_settings`` are JSON blobs that
embed connector UUIDs. The walker remaps them using the running
``RemapTable`` (connectors already landed in the previous phase).
2. Creating a workflow auto-creates empty ``WorkflowEndpoint`` rows
server-side. We don't touch those here — the dedicated
WorkflowEndpoint phase reconciles them after ToolInstance lands.
"""
from __future__ import annotations
import logging
import threading
from typing import Any
from unstract.clone.exceptions import NameConflictError
from unstract.clone.phases.base import Phase, build_post_payload
from unstract.clone.report import CloneReport, PhaseResult
from unstract.clone.walker import remap_uuids
logger = logging.getLogger(__name__)
WORKFLOW_PATH = "workflow/"
class WorkflowPhase(Phase):
name = "workflow"
def run(self, report: CloneReport) -> PhaseResult:
result = report.get_phase(self.name)
try:
self._writable = self.ctx.target.get_post_schema(WORKFLOW_PATH)
except Exception as e:
logger.exception("Failed to fetch target POST schema for workflow: %s", e)
result.failed += 1
result.errors.append(f"OPTIONS workflow: {e}")
return result
try:
src_workflows = self.ctx.source.list_workflows()
except Exception as e:
logger.exception("Failed to list source workflows: %s", e)
result.failed += 1
result.errors.append(f"list source workflows: {e}")
return result
# Built once so per-workflow cascade-skip checks stay O(1).
self._wf_to_src_tool_id = self._collect_wf_tool_map(result)
logger.info("Found %d workflow(s) in source org", len(src_workflows))
self.parallel_map(
src_workflows,
lambda src, lock: self._clone_one(src, result, lock),
)
return result
def _collect_wf_tool_map(self, result: PhaseResult) -> dict[str, str]:
"""Map source workflow_id to its ToolInstance.tool_id; listed once
to avoid N+1 fetches.
"""
if not self.ctx.skipped_custom_tool_registry_ids:
return {}
try:
tis = self.ctx.source.list_tool_instances()
except Exception as e:
logger.warning(
"workflow phase: failed to list source tool_instances for "
"cascade-skip lookup (%s); proceeding without cascade",
e,
)
return {}
mapping: dict[str, str] = {}
for ti in tis:
wf_id = ti.get("workflow")
tool_id = ti.get("tool_id")
if wf_id and tool_id:
mapping[wf_id] = tool_id
return mapping
def _clone_one(
self, src: dict[str, Any], result: PhaseResult, lock: threading.Lock
) -> None:
name = src["workflow_name"]
src_id = src["id"]
src_tool_id = self._wf_to_src_tool_id.get(src_id)
if src_tool_id and src_tool_id in self.ctx.skipped_custom_tool_registry_ids:
logger.warning(
"skipping workflow '%s' src=%s — its tool was skipped in "
"custom_tool phase (frictionless adapter dependence)",
name,
src_id,
)
with lock:
result.skipped += 1
return
try:
existing = self.ctx.target.list_workflows(name=name)
except Exception as e:
logger.exception("Failed to GET workflow %s on target: %s", name, e)
with lock:
result.failed += 1
result.errors.append(f"GET {name}: {e}")
return
if existing:
tgt = existing[0]
if self.ctx.options.on_name_conflict == "abort":
raise NameConflictError(
f"workflow '{name}' already exists in target as {tgt['id']}"
)
with lock:
result.adopted += 1
logger.info(
"adopted workflow '%s' src=%s -> tgt=%s", name, src_id, tgt["id"]
)
elif self.ctx.options.dry_run:
with lock:
result.skipped += 1
logger.info("[dry-run] would create workflow '%s' src=%s", name, src_id)
return
else:
# List endpoints serve stripped payloads (e.g. AdapterListSerializer
# omits adapter_metadata_b); workflow detail carries the JSON blobs
# source_settings / destination_settings that embed connector UUIDs.
try:
src_detail = self.ctx.source.get_workflow(src_id)
except Exception as e:
logger.exception(
"Failed to GET source workflow %s detail: %s", name, e
)
with lock:
result.failed += 1
result.errors.append(f"GET source detail {name}: {e}")
return
remapped = remap_uuids(src_detail, self.ctx.remap)
payload = build_post_payload(remapped, self._writable)
try:
tgt = self.ctx.target.create_workflow(payload)
except Exception as e:
logger.exception("Failed to create workflow %s: %s", name, e)
with lock:
result.failed += 1
result.errors.append(f"create {name}: {e}")
return
with lock:
result.created += 1
logger.info(
"created workflow '%s' src=%s -> tgt=%s", name, src_id, tgt["id"]
)
with lock:
self.ctx.remap.record("workflow", src_id, tgt["id"])