-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrollback_run.py
More file actions
298 lines (250 loc) · 10.2 KB
/
rollback_run.py
File metadata and controls
298 lines (250 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#!/usr/bin/env python3
"""Rollback a completed SourcePipeline run from the command line.
Thin CLI wrapper around ``SourcePipeline.rollback(run_id)``. The base
class already contains the full rollback logic (LIFO over
``ingestion_impacts`` → per-action reversal via ``_rollback_insert`` /
``_rollback_flip`` / ``_rollback_scd``) plus a valid-state-transition
guard on ``ingestion_manifest.fetch_status``. This wrapper only
provides:
* dry-run reporting: manifest status, impact counts, pre-state row
summary on the target table, and what the rollback would do
* safety flags: ``--confirm`` + ``--i-understand-this-writes`` gate
any write; ``--allow-prod`` required when the DB path resolves to
``data/13f.duckdb``
* idempotency: refuses to re-rollback a run already in
``rolled_back`` status with a clear message
Use cases:
* int-22 — reverse the re-load of ``13f_holdings`` for
``quarter=2025Q4`` on 2026-04-22 (run_id
``13f_holdings_quarter=2025Q4_20260422_200854``) that reset
``is_latest`` on tickerless rows and flipped the enriched
population to FALSE.
Examples::
# dry-run against staging (default; never writes)
python3 scripts/rollback_run.py \\
--run-id 13f_holdings_quarter=2025Q4_20260422_200854 \\
--db data/13f_staging.duckdb
# execute against staging
python3 scripts/rollback_run.py \\
--run-id 13f_holdings_quarter=2025Q4_20260422_200854 \\
--db data/13f_staging.duckdb \\
--confirm --i-understand-this-writes
# execute against prod (Terminal-only per staging-write protocol)
python3 scripts/rollback_run.py \\
--run-id 13f_holdings_quarter=2025Q4_20260422_200854 \\
--confirm --i-understand-this-writes --allow-prod
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
from typing import Any
import duckdb
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT / "scripts"))
PROD_DB_BASENAME = "13f.duckdb"
def _is_prod_path(db_path: str) -> bool:
"""True if ``db_path`` resolves to the prod DB file name."""
return os.path.basename(os.path.abspath(db_path)) == PROD_DB_BASENAME
def _load_manifest_row(con: Any, run_id: str) -> dict | None:
row = con.execute(
"SELECT manifest_id, source_type, fetch_status "
"FROM ingestion_manifest WHERE run_id = ? "
"ORDER BY manifest_id DESC LIMIT 1",
[run_id],
).fetchone()
if not row:
return None
return {
"manifest_id": int(row[0]),
"source_type": row[1],
"fetch_status": row[2],
}
def _impact_counts(con: Any, manifest_id: int) -> list[tuple[str, int]]:
rows = con.execute(
"SELECT unit_type, COUNT(*) "
"FROM ingestion_impacts WHERE manifest_id = ? "
"GROUP BY unit_type ORDER BY unit_type",
[manifest_id],
).fetchall()
return [(r[0], int(r[1])) for r in rows]
def _holdings_v2_quarter_report(con: Any, quarter: str) -> dict:
"""Pre/post state snapshot for a single quarter on holdings_v2.
Reports the four cells of the is_latest × ticker matrix that the
int-22 diagnosis used."""
row = con.execute(
"SELECT "
" SUM(CASE WHEN is_latest THEN 1 ELSE 0 END), "
" SUM(CASE WHEN is_latest AND ticker IS NULL THEN 1 ELSE 0 END), "
" SUM(CASE WHEN is_latest AND ticker IS NOT NULL THEN 1 ELSE 0 END), "
" SUM(CASE WHEN NOT is_latest THEN 1 ELSE 0 END), "
" COUNT(*) "
"FROM holdings_v2 WHERE quarter = ?",
[quarter],
).fetchone() # nosec B608
return {
"is_latest_TRUE": int(row[0] or 0),
"is_latest_TRUE_AND_ticker_NULL": int(row[1] or 0),
"is_latest_TRUE_AND_ticker_NOT_NULL": int(row[2] or 0),
"is_latest_FALSE": int(row[3] or 0),
"total": int(row[4] or 0),
}
def _print_report(label: str, rep: dict) -> None:
print(f" {label}")
print(f" is_latest=TRUE : {rep['is_latest_TRUE']:>12,}")
print(f" is_latest=TRUE & ticker NULL: {rep['is_latest_TRUE_AND_ticker_NULL']:>12,}")
print(f" is_latest=TRUE & ticker set : {rep['is_latest_TRUE_AND_ticker_NOT_NULL']:>12,}")
print(f" is_latest=FALSE : {rep['is_latest_FALSE']:>12,}")
print(f" total rows : {rep['total']:>12,}")
def _reporting_quarter_for(run_id: str) -> str | None:
"""Extract ``quarter=YYYYQN`` from run_id if present. Only used to
scope the before/after reporting summary."""
for part in run_id.split("_"):
if part.startswith("quarter="):
return part.split("=", 1)[1]
return None
def _build_pipeline(source_type: str, db_path: str) -> Any:
"""Instantiate the pipeline for ``source_type`` pointed at ``db_path``."""
# pylint: disable=import-outside-toplevel
from pipeline.pipelines import PIPELINE_REGISTRY # type: ignore[import-not-found]
if source_type not in PIPELINE_REGISTRY:
raise SystemExit(
f" ABORT: source_type {source_type!r} not in PIPELINE_REGISTRY. "
f"Known: {sorted(PIPELINE_REGISTRY.keys())}"
)
entry = PIPELINE_REGISTRY[source_type]
cls = entry() if callable(entry) and not isinstance(entry, type) else entry
return cls(prod_db_path=db_path, staging_db_path=db_path)
def run(
run_id: str,
db_path: str,
dry_run: bool,
i_understand: bool,
allow_prod: bool,
) -> int:
if not os.path.exists(db_path):
print(f" ABORT: DB not found: {db_path}")
return 2
if not dry_run:
if not i_understand:
print(
" ABORT: --confirm without --i-understand-this-writes. "
"This operation writes to the database; pass "
"--i-understand-this-writes to proceed."
)
return 2
if _is_prod_path(db_path) and not allow_prod:
print(
f" ABORT: {db_path} resolves to the prod DB "
f"({PROD_DB_BASENAME}) but --allow-prod was not passed. "
"Prod writes from this wrapper require explicit opt-in."
)
return 2
# ---- dry-run read-only probe -----------------------------------------
ro = duckdb.connect(db_path, read_only=True)
try:
manifest = _load_manifest_row(ro, run_id)
if manifest is None:
print(f" ABORT: run_id not found in ingestion_manifest: {run_id!r}")
return 2
status = manifest["fetch_status"]
source_type = manifest["source_type"]
manifest_id = manifest["manifest_id"]
print(f" DB : {db_path}")
print(f" run_id : {run_id}")
print(f" source_type : {source_type}")
print(f" manifest_id : {manifest_id}")
print(f" fetch_status : {status}")
print(f" mode : {'dry-run' if dry_run else 'confirm'}")
if status == "rolled_back":
print(
" NOOP: run is already in 'rolled_back' status. "
"Nothing to do."
)
return 0
if status != "complete":
print(
f" ABORT: rollback requires fetch_status='complete'; "
f"current status is {status!r}."
)
return 2
impacts = _impact_counts(ro, manifest_id)
if not impacts:
print(
" ABORT: no ingestion_impacts rows for this manifest. "
"Rollback would be a no-op and cannot reverse anything."
)
return 2
print(" impact summary:")
for unit_type, n in impacts:
print(f" {unit_type:<20} {n:>10,}")
quarter = _reporting_quarter_for(run_id)
if quarter and source_type == "13f_holdings":
pre = _holdings_v2_quarter_report(ro, quarter)
print(f" pre-state holdings_v2 quarter={quarter}:")
_print_report("", pre)
finally:
ro.close()
if dry_run:
print(" DRY-RUN: no writes performed. Pass --confirm to execute.")
return 0
# ---- execute rollback ------------------------------------------------
pipeline = _build_pipeline(source_type, db_path)
print(f" executing pipeline.rollback({run_id!r}) ...")
pipeline.rollback(run_id)
print(" rollback complete.")
# ---- post-state report -----------------------------------------------
ro = duckdb.connect(db_path, read_only=True)
try:
manifest = _load_manifest_row(ro, run_id)
status = manifest["fetch_status"] if manifest else "unknown"
print(f" post status : {status}")
quarter = _reporting_quarter_for(run_id)
if quarter and source_type == "13f_holdings":
post = _holdings_v2_quarter_report(ro, quarter)
print(f" post-state holdings_v2 quarter={quarter}:")
_print_report("", post)
finally:
ro.close()
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--run-id", required=True,
help="run_id of the completed pipeline run to reverse.",
)
parser.add_argument(
"--db", default=None,
help="DB path. Defaults to data/13f.duckdb (prod).",
)
parser.add_argument(
"--dry-run", action="store_true",
help="Report actions; no writes. Default when --confirm is absent.",
)
parser.add_argument(
"--confirm", action="store_true",
help="Execute the rollback. Requires --i-understand-this-writes.",
)
parser.add_argument(
"--i-understand-this-writes", action="store_true",
dest="i_understand_this_writes",
help="Explicit acknowledgement that --confirm writes to the DB.",
)
parser.add_argument(
"--allow-prod", action="store_true",
help=f"Permit writes when --db resolves to {PROD_DB_BASENAME}.",
)
args = parser.parse_args()
if args.db is None:
args.db = str(ROOT / "data" / "13f.duckdb")
dry_run = not args.confirm
return run(
run_id=args.run_id,
db_path=args.db,
dry_run=dry_run,
i_understand=args.i_understand_this_writes,
allow_prod=args.allow_prod,
)
if __name__ == "__main__":
sys.exit(main())