-
Notifications
You must be signed in to change notification settings - Fork 113
Expand file tree
/
Copy pathutils.py
More file actions
121 lines (101 loc) · 4.07 KB
/
utils.py
File metadata and controls
121 lines (101 loc) · 4.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import os
import pathlib
import shutil
import tempfile
import uuid
from dataclasses import dataclass
from urllib.parse import urlparse
from deltalake import DeltaTable
MINIO_BUCKET = os.environ.get("CI_MINIO_BUCKET", "ci-tests")
MINIO_ENDPOINT = os.environ.get(
"CI_MINIO_ENDPOINT", "http://minio.minio.svc.cluster.local:9000"
)
MINIO_REGION = os.environ.get("CI_MINIO_REGION", "us-east-1")
KAFKA_BOOTSTRAP = os.environ.get(
"KAFKA_BOOTSTRAP_SERVERS", "ci-kafka-bootstrap.kafka:9092"
)
def env_truthy(name: str) -> bool:
"""Return True when an environment variable is set to a truthy value."""
value = os.environ.get(name)
return value is not None and value.lower() not in {"", "0", "false", "no"}
def required_env(name: str) -> str:
"""Return a required environment variable or raise a descriptive error."""
value = os.environ.get(name)
if value:
return value
raise RuntimeError(f"required environment variable '{name}' is not set")
def runs_in_ci() -> bool:
"""Return True when the test suite is running under CI."""
return env_truthy("CI")
@dataclass
class DeltaTestLocation:
"""Describe where the Delta sink writes test data and how to read it back."""
uri: str
connector_config: dict[str, object]
root_path: str
local_dir: pathlib.Path | None = None
@classmethod
def create(cls, pipeline_name: str) -> "DeltaTestLocation":
"""Use the local filesystem for local runs and MinIO-backed S3 in CI."""
if runs_in_ci():
access_key = required_env("CI_K8S_MINIO_ACCESS_KEY_ID")
secret_key = required_env("CI_K8S_MINIO_SECRET_ACCESS_KEY")
prefix = f"{pipeline_name}/{uuid.uuid4().hex}"
root_path = f"{MINIO_BUCKET}/{prefix}"
minio_endpoint = MINIO_ENDPOINT.rstrip("/")
parsed_endpoint = urlparse(minio_endpoint)
if (
parsed_endpoint.scheme not in {"http", "https"}
or not parsed_endpoint.netloc
):
raise ValueError(
"CI_MINIO_ENDPOINT must be a full URL, e.g. "
"'http://minio.minio.svc.cluster.local:9000'"
)
return cls(
uri=f"s3://{root_path}",
connector_config={
"uri": f"s3://{root_path}",
"mode": "truncate",
"aws_access_key_id": access_key,
"aws_secret_access_key": secret_key,
"aws_region": MINIO_REGION,
"aws_endpoint": minio_endpoint,
"aws_allow_http": str(parsed_endpoint.scheme == "http").lower(),
},
root_path=root_path,
)
local_dir = pathlib.Path(
tempfile.mkdtemp(prefix=f"{pipeline_name}_delta_", dir="/tmp")
)
return cls(
uri=f"file://{local_dir}",
connector_config={
"uri": f"file://{local_dir}",
"mode": "truncate",
},
root_path=str(local_dir),
local_dir=local_dir,
)
def _delta_storage_options(self) -> dict[str, str]:
"""Return `deltalake` storage_options derived from the connector config."""
return {
k: str(v)
for k, v in self.connector_config.items()
if k not in ("uri", "mode")
}
def row_count(self) -> int:
"""Return the row count of the current Delta snapshot.
Uses the per-file `numRecords` stats recorded in the delta log, so
we never need to scan parquet (and never need pyarrow).
"""
dt = DeltaTable(self.uri, storage_options=self._delta_storage_options())
return dt.count()
def cleanup(self) -> None:
"""Remove the local temp directory, if any.
No-op on the CI/MinIO path: the bucket is ephemeral and the uuid
prefix prevents collisions, so leaked keys are acceptable.
"""
if self.local_dir is not None:
shutil.rmtree(self.local_dir, ignore_errors=True)
self.local_dir = None