Skip to content

Commit 5f3ddcb

Browse files
authored
Refactor staging client uploader and use it in EMR launcher (feast-dev#1219)
Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
1 parent 96b082b commit 5f3ddcb

9 files changed

Lines changed: 205 additions & 165 deletions

File tree

sdk/python/feast/constants.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class ConfigOptions(metaclass=ConfigMeta):
140140

141141
#: Feast Spark Job ingestion jobs staging location. The choice of storage is connected to the choice of SPARK_LAUNCHER.
142142
#:
143-
#: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file://data/subfolder/
143+
#: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/
144144
SPARK_STAGING_LOCATION: Optional[str] = None
145145

146146
#: Feast Spark Job ingestion jar file. The choice of storage is connected to the choice of SPARK_LAUNCHER.
@@ -206,7 +206,7 @@ class ConfigOptions(metaclass=ConfigMeta):
206206

207207
#: Ingestion Job DeadLetter Destination. The choice of storage is connected to the choice of SPARK_LAUNCHER.
208208
#:
209-
#: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file://data/subfolder/
209+
#: Eg. gs://some-bucket/output/, s3://some-bucket/output/, file:///data/subfolder/
210210
DEADLETTER_PATH: str = ""
211211

212212
#: ProtoRegistry Address (currently only Stencil Server is supported as registry)

sdk/python/feast/loaders/file.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def export_source_to_staging_location(
7777
)
7878
else:
7979
# gs, s3 file provided as a source.
80+
assert source_uri.hostname is not None
8081
return get_staging_client(source_uri.scheme).list_files(
8182
bucket=source_uri.hostname, path=source_uri.path
8283
)
@@ -87,9 +88,12 @@ def export_source_to_staging_location(
8788
)
8889

8990
# Push data to required staging location
90-
get_staging_client(uri.scheme).upload_file(
91-
source_path, uri.hostname, str(uri.path).strip("/") + "/" + file_name,
92-
)
91+
with open(source_path, "rb") as f:
92+
get_staging_client(uri.scheme).upload_fileobj(
93+
f,
94+
source_path,
95+
remote_uri=uri._replace(path=str(uri.path).strip("/") + "/" + file_name),
96+
)
9397

9498
# Clean up, remove local staging file
9599
if dir_path and isinstance(source, pd.DataFrame) and len(dir_path) > 4:

sdk/python/feast/loaders/ingest.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,16 +187,28 @@ def _upload_to_file_source(
187187
for path in glob.glob(os.path.join(dest_path, "**/*")):
188188
file_name = path.split("/")[-1]
189189
partition_col = path.split("/")[-2]
190-
staging_client.upload_file(
191-
path,
192-
uri.hostname,
193-
str(uri.path).strip("/") + "/" + partition_col + "/" + file_name,
194-
)
190+
with open(path, "rb") as f:
191+
staging_client.upload_fileobj(
192+
f,
193+
path,
194+
remote_uri=uri._replace(
195+
path=str(uri.path).rstrip("/")
196+
+ "/"
197+
+ partition_col
198+
+ "/"
199+
+ file_name
200+
),
201+
)
195202
else:
196203
file_name = dest_path.split("/")[-1]
197-
staging_client.upload_file(
198-
dest_path, uri.hostname, str(uri.path).strip("/") + "/" + file_name,
199-
)
204+
with open(dest_path, "rb") as f:
205+
staging_client.upload_fileobj(
206+
f,
207+
dest_path,
208+
remote_uri=uri._replace(
209+
path=str(uri.path).rstrip("/") + "/" + file_name
210+
),
211+
)
200212

201213

202214
def _upload_to_bq_source(

sdk/python/feast/pyspark/launchers/aws/emr.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import tempfile
33
from io import BytesIO
44
from typing import Any, Dict, List, Optional
5+
from urllib.parse import urlunparse
56

67
import boto3
78
import pandas
@@ -21,6 +22,7 @@
2122
StreamIngestionJob,
2223
StreamIngestionJobParameters,
2324
)
25+
from feast.staging.storage_client import get_staging_client
2426

2527
from .emr_utils import (
2628
FAILED_STEP_STATES,
@@ -39,7 +41,6 @@
3941
_list_jobs,
4042
_load_new_cluster_template,
4143
_random_string,
42-
_s3_upload,
4344
_stream_ingestion_step,
4445
_sync_offline_to_online_step,
4546
_upload_jar,
@@ -221,11 +222,13 @@ def historical_feature_retrieval(
221222
with open(job_params.get_main_file_path()) as f:
222223
pyspark_script = f.read()
223224

224-
pyspark_script_path = _s3_upload(
225-
BytesIO(pyspark_script.encode("utf8")),
226-
local_path="historical_retrieval.py",
227-
remote_path_prefix=self._staging_location,
228-
remote_path_suffix=".py",
225+
pyspark_script_path = urlunparse(
226+
get_staging_client("s3").upload_fileobj(
227+
BytesIO(pyspark_script.encode("utf8")),
228+
local_path="historical_retrieval.py",
229+
remote_path_prefix=self._staging_location,
230+
remote_path_suffix=".py",
231+
)
229232
)
230233

231234
step = _historical_retrieval_step(
@@ -304,12 +307,18 @@ def start_stream_to_online_ingestion(
304307
def stage_dataframe(self, df: pandas.DataFrame, event_timestamp: str) -> FileSource:
305308
with tempfile.NamedTemporaryFile() as f:
306309
df.to_parquet(f)
307-
file_url = _s3_upload(
308-
f,
309-
f.name,
310-
remote_path_prefix=os.path.join(self._staging_location, "dataframes"),
311-
remote_path_suffix=".parquet",
310+
311+
file_url = urlunparse(
312+
get_staging_client("s3").upload_fileobj(
313+
f,
314+
f.name,
315+
remote_path_prefix=os.path.join(
316+
self._staging_location, "dataframes"
317+
),
318+
remote_path_suffix=".parquet",
319+
)
312320
)
321+
313322
return FileSource(
314323
event_timestamp_column=event_timestamp,
315324
file_format=ParquetFormat(),

sdk/python/feast/pyspark/launchers/aws/emr_utils.py

Lines changed: 8 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
import hashlib
21
import logging
32
import os
43
import random
54
import string
65
import time
7-
from typing import IO, Any, Dict, List, NamedTuple, Optional, Tuple
6+
from typing import Any, Dict, List, NamedTuple, Optional
7+
from urllib.parse import urlparse, urlunparse
88

9-
import boto3
10-
import botocore
119
import yaml
1210

1311
__all__ = [
@@ -27,12 +25,12 @@
2725
"_list_jobs",
2826
"_load_new_cluster_template",
2927
"_random_string",
30-
"_s3_upload",
3128
"_stream_ingestion_step",
3229
"_sync_offline_to_online_step",
3330
"_upload_jar",
3431
"_wait_for_job_state",
3532
]
33+
from feast.staging.storage_client import get_staging_client
3634

3735
log = logging.getLogger("aws")
3836

@@ -77,82 +75,13 @@ def _random_string(length) -> str:
7775
return "".join(random.choice(string.ascii_lowercase) for _ in range(length))
7876

7977

80-
def _s3_split_path(path: str) -> Tuple[str, str]:
81-
""" Convert s3:// url to (bucket, key) """
82-
assert path.startswith("s3://")
83-
_, _, bucket, key = path.split("/", 3)
84-
return bucket, key
85-
86-
87-
def _hash_fileobj(fileobj: IO[bytes]) -> str:
88-
""" Compute sha256 hash of a file. File pointer will be reset to 0 on return. """
89-
fileobj.seek(0)
90-
h = hashlib.sha256()
91-
for block in iter(lambda: fileobj.read(2 ** 20), b""):
92-
h.update(block)
93-
fileobj.seek(0)
94-
return h.hexdigest()
95-
96-
97-
def _s3_upload(
98-
fileobj: IO[bytes],
99-
local_path: str,
100-
*,
101-
remote_path: Optional[str] = None,
102-
remote_path_prefix: Optional[str] = None,
103-
remote_path_suffix: Optional[str] = None,
104-
) -> str:
105-
"""
106-
Upload a local file to S3. We store the file sha256 sum in S3 metadata and skip the upload
107-
if the file hasn't changed.
108-
109-
You can either specify remote_path or remote_path_prefix+remote_path_suffix. In the latter case,
110-
the remote path will be computed as $remote_path_prefix/$sha256$remote_path_suffix
111-
"""
112-
113-
assert (remote_path is not None) or (
114-
remote_path_prefix is not None and remote_path_suffix is not None
115-
)
116-
117-
sha256sum = _hash_fileobj(fileobj)
118-
119-
if remote_path is None:
120-
assert remote_path_prefix is not None
121-
remote_path = os.path.join(
122-
remote_path_prefix, f"{sha256sum}{remote_path_suffix}"
123-
)
124-
125-
bucket, key = _s3_split_path(remote_path)
126-
client = boto3.client("s3")
127-
128-
try:
129-
head_response = client.head_object(Bucket=bucket, Key=key)
130-
if head_response["Metadata"]["sha256sum"] == sha256sum:
131-
# File already exists
132-
return remote_path
133-
else:
134-
log.info("Uploading {local_path} to {remote_path}")
135-
client.upload_fileobj(
136-
fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}},
137-
)
138-
return remote_path
139-
except botocore.exceptions.ClientError as e:
140-
if e.response["Error"]["Code"] == "404":
141-
log.info("Uploading {local_path} to {remote_path}")
142-
client.upload_fileobj(
143-
fileobj, bucket, key, ExtraArgs={"Metadata": {"sha256sum": sha256sum}},
144-
)
145-
return remote_path
146-
else:
147-
raise
148-
149-
15078
def _upload_jar(jar_s3_prefix: str, local_path: str) -> str:
15179
with open(local_path, "rb") as f:
152-
return _s3_upload(
153-
f,
154-
local_path,
155-
remote_path=os.path.join(jar_s3_prefix, os.path.basename(local_path)),
80+
uri = urlparse(os.path.join(jar_s3_prefix, os.path.basename(local_path)))
81+
return urlunparse(
82+
get_staging_client(uri.scheme).upload_fileobj(
83+
f, local_path, remote_uri=uri,
84+
)
15685
)
15786

15887

sdk/python/feast/pyspark/launchers/gcloud/dataproc.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,16 @@ def _stage_file(self, file_path: str, job_id: str) -> str:
253253
return file_path
254254

255255
staging_client = get_staging_client("gs")
256-
blob_path = os.path.join(self.remote_path, job_id, os.path.basename(file_path),)
257-
staging_client.upload_file(file_path, self.staging_bucket, blob_path)
256+
blob_path = os.path.join(
257+
self.remote_path, job_id, os.path.basename(file_path),
258+
).lstrip("/")
259+
blob_uri_str = f"gs://{self.staging_bucket}/{blob_path}"
260+
with open(file_path, "rb") as f:
261+
staging_client.upload_fileobj(
262+
f, file_path, remote_uri=urlparse(blob_uri_str)
263+
)
258264

259-
return f"gs://{self.staging_bucket}/{blob_path}"
265+
return blob_uri_str
260266

261267
def dataproc_submit(
262268
self, job_params: SparkJobParameters

sdk/python/feast/staging/entities.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,11 @@ def stage_entities_to_fs(
3535
)
3636

3737
entity_source.to_parquet(df_export_path.name)
38-
bucket = (
39-
None if entity_staging_uri.scheme == "file" else entity_staging_uri.netloc
40-
)
41-
staging_client.upload_file(
42-
df_export_path.name, bucket, entity_staging_uri.path.lstrip("/")
43-
)
38+
39+
with open(df_export_path.name, "rb") as f:
40+
staging_client.upload_fileobj(
41+
f, df_export_path.name, remote_uri=entity_staging_uri
42+
)
4443

4544
# ToDo: support custom event_timestamp_column
4645
return FileSource(

0 commit comments

Comments
 (0)