-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathbase.py
More file actions
115 lines (98 loc) · 3.93 KB
/
base.py
File metadata and controls
115 lines (98 loc) · 3.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Base class for clone phases."""
from __future__ import annotations
import logging
import threading
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable
from concurrent.futures import FIRST_EXCEPTION, Future, ThreadPoolExecutor, wait
from typing import Any, TypeVar
from unstract.clone.context import CloneContext
from unstract.clone.exceptions import CloneError
from unstract.clone.report import CloneReport, PhaseResult
T = TypeVar("T")
logger = logging.getLogger(__name__)
# DRF OPTIONS reports any ModelSerializer FK/M2M as writable, but the
# backend's perform_create overrides these server-side. Posting them is
# either noise (silently overwritten) or a 400 (when a source-org value
# doesn't validate against the target org). Strip them universally —
# the phase OPTIONS schema covers the entity-specific writable subset.
SERVER_MANAGED: frozenset[str] = frozenset(
{
"id",
"organization",
"created_by",
"created_by_email",
"modified_by",
"modified_by_email",
"created_at",
"modified_at",
"shared_users",
"workflow_owner",
}
)
def build_post_payload(src: dict[str, Any], writable: frozenset[str]) -> dict[str, Any]:
"""Project ``src`` onto the writable schema, dropping server-managed
fields, ``None`` values, and empty strings (which DRF treats as blank
and rejects on required fields).
"""
keys = writable - SERVER_MANAGED
# Equality with `(None, "")` matched False and 0 too (Python: False == 0,
# 0 in (None, "") is False, but `0 not in (...)` falsely returns True).
# Explicit identity / equality checks preserve falsy-but-meaningful
# values like ``BooleanField`` False and numeric defaults.
return {k: src[k] for k in keys if k in src and src[k] is not None and src[k] != ""}
class Phase(ABC):
"""Abstract phase. One subclass per entity type."""
name: str = ""
def __init__(self, ctx: CloneContext):
self.ctx = ctx
@abstractmethod
def run(self, report: CloneReport) -> PhaseResult:
"""Migrate all entities of this phase's type. Idempotent across runs."""
raise NotImplementedError
def parallel_map(
self,
items: Iterable[T],
work_fn: Callable[[T, threading.Lock], None],
) -> None:
"""Fan ``work_fn(item, lock)`` across ``ctx.options.concurrency``
threads. ``work_fn`` must hold ``lock`` while mutating shared
state. ``CloneError`` from any worker cancels the rest and
re-raises. ``concurrency <= 1`` skips the executor entirely.
"""
materialised = list(items)
if not materialised:
return
concurrency = max(1, self.ctx.options.concurrency)
lock = threading.Lock()
if concurrency == 1:
for item in materialised:
work_fn(item, lock)
return
with ThreadPoolExecutor(
max_workers=concurrency,
thread_name_prefix=f"clone-{self.name}",
) as pool:
futures: list[Future[None]] = [
pool.submit(work_fn, item, lock) for item in materialised
]
done, _ = wait(futures, return_when=FIRST_EXCEPTION)
clone_err: CloneError | None = None
other_err: BaseException | None = None
for fut in done:
if fut.cancelled():
continue
exc = fut.exception()
if exc is None:
continue
if isinstance(exc, CloneError) and clone_err is None:
clone_err = exc
elif other_err is None:
other_err = exc
if clone_err is not None or other_err is not None:
for fut in futures:
fut.cancel()
if clone_err is not None:
raise clone_err
if other_err is not None:
raise other_err