-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcreate_dev_workspace.py
More file actions
78 lines (67 loc) · 2.42 KB
/
create_dev_workspace.py
File metadata and controls
78 lines (67 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python3
from __future__ import annotations
import argparse
from pathlib import Path
import shutil
import sys
def main() -> int:
repo_root = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(repo_root / "src"))
from codexdatalab.data_ops import import_dataset
from codexdatalab.settings import load_settings
from codexdatalab.workspace import load_workspace
from codexdatalab.workspace_scaffold import create_workspace_skeleton
parser = argparse.ArgumentParser(description="Create a local dev/test workspace (gitignored).")
parser.add_argument(
"--path",
type=Path,
default=repo_root / ".codexdatalab_test_workspace",
help="Workspace path to create (default: repo/.codexdatalab_test_workspace).",
)
parser.add_argument(
"--fixtures-dir",
type=Path,
default=repo_root / "tests" / "fixtures",
help="Directory containing raw-data fixtures to copy into raw/.",
)
parser.add_argument(
"--reset",
action="store_true",
help="Delete and recreate the workspace directory before populating fixtures.",
)
parser.add_argument(
"--overwrite-fixtures",
action="store_true",
help="Re-import fixtures even if already present (without deleting the whole workspace).",
)
parser.add_argument(
"--skip-import",
action="store_true",
help="Create the workspace skeleton but skip importing fixtures.",
)
args = parser.parse_args()
if args.reset and args.path.exists():
shutil.rmtree(args.path)
create_workspace_skeleton(args.path)
if not args.skip_import:
settings = load_settings()
workspace = load_workspace(args.path, settings)
fixtures_dir = args.fixtures_dir
if fixtures_dir.is_dir():
fixtures = sorted(p for p in fixtures_dir.iterdir() if p.is_file())
for fixture in fixtures:
try:
import_dataset(
workspace,
fixture,
link=False,
force_copy=args.overwrite_fixtures,
prompt=lambda _: "c",
)
except Exception:
if not args.overwrite_fixtures:
continue
print(args.path)
return 0
if __name__ == "__main__":
raise SystemExit(main())