Skip to content

Commit 4da4b23

Browse files
jmelinavterryyylimwoop
authored
Add support to Python SDK for staging files on Amazon S3 (#769)
* Add S3 staging support to python client SDK * added pydocs * PR comments * change log and license info * update to license * lint error * PR comments. * get batch feature failure fix. * Fix Mypy type issues * Improve comments in storage client Co-authored-by: Terence Lim <terencelimxp@gmail.com> Co-authored-by: Willem Pienaar <git@willem.co>
1 parent f685726 commit 4da4b23

File tree

9 files changed

+524
-114
lines changed

9 files changed

+524
-114
lines changed

sdk/python/feast/job.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
1-
import tempfile
21
from typing import List
32
from urllib.parse import urlparse
43

54
import fastavro
65
import pandas as pd
7-
from google.cloud import storage
86
from google.protobuf.json_format import MessageToJson
97

108
from feast.constants import CONFIG_TIMEOUT_KEY
@@ -23,9 +21,17 @@
2321
from feast.serving.ServingService_pb2 import Job as JobProto
2422
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
2523
from feast.source import Source
24+
from feast.staging.storage_client import get_staging_client
2625
from feast.wait import wait_retry_backoff
2726
from tensorflow_metadata.proto.v0 import statistics_pb2
2827

28+
# Maximum no of seconds to wait until the retrieval jobs status is DONE in Feast
29+
# Currently set to the maximum query execution time limit in BigQuery
30+
DEFAULT_TIMEOUT_SEC: int = 21600
31+
32+
# Maximum no of seconds to wait before reloading the job status in Feast
33+
MAX_WAIT_INTERVAL_SEC: int = 60
34+
2935

3036
class RetrievalJob:
3137
"""
@@ -42,8 +48,6 @@ def __init__(
4248
"""
4349
self.job_proto = job_proto
4450
self.serving_stub = serving_stub
45-
# TODO: abstract away GCP depedency
46-
self.gcs_client = storage.Client(project=None)
4751

4852
@property
4953
def id(self):
@@ -117,16 +121,7 @@ def result(self, timeout_sec: int = int(defaults[CONFIG_TIMEOUT_KEY])):
117121
"""
118122
uris = self.get_avro_files(timeout_sec)
119123
for file_uri in uris:
120-
if file_uri.scheme == "gs":
121-
file_obj = tempfile.TemporaryFile()
122-
self.gcs_client.download_blob_to_file(file_uri.geturl(), file_obj)
123-
elif file_uri.scheme == "file":
124-
file_obj = open(file_uri.path, "rb")
125-
else:
126-
raise Exception(
127-
f"Could not identify file URI {file_uri}. Only gs:// and file:// supported"
128-
)
129-
124+
file_obj = get_staging_client(file_uri.scheme).download_file(file_uri)
130125
file_obj.seek(0)
131126
avro_reader = fastavro.reader(file_obj)
132127

sdk/python/feast/loaders/file.py

Lines changed: 21 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,18 @@
1313
# limitations under the License.
1414

1515
import os
16-
import re
1716
import shutil
1817
import tempfile
1918
import uuid
2019
from datetime import datetime
2120
from typing import List, Optional, Tuple, Union
22-
from urllib.parse import ParseResult, urlparse
21+
from urllib.parse import urlparse
2322

2423
import pandas as pd
25-
from google.cloud import storage
2624
from pandavro import to_avro
2725

26+
from feast.staging.storage_client import get_staging_client
27+
2828

2929
def export_source_to_staging_location(
3030
source: Union[pd.DataFrame, str], staging_location_uri: str
@@ -44,12 +44,14 @@ def export_source_to_staging_location(
4444
* Pandas DataFrame
4545
* Local Avro file
4646
* GCS Avro file
47+
* S3 Avro file
4748
4849
4950
staging_location_uri (str):
5051
Remote staging location where DataFrame should be written.
5152
Examples:
5253
* gs://bucket/path/
54+
* s3://bucket/path/
5355
* file:///data/subfolder/
5456
5557
Returns:
@@ -66,52 +68,37 @@ def export_source_to_staging_location(
6668
uri_path = None # type: Optional[str]
6769
if uri.scheme == "file":
6870
uri_path = uri.path
69-
7071
# Remote gs staging location provided by serving
7172
dir_path, file_name, source_path = export_dataframe_to_local(
7273
df=source, dir_path=uri_path
7374
)
74-
elif urlparse(source).scheme in ["", "file"]:
75-
# Local file provided as a source
76-
dir_path = ""
77-
file_name = os.path.basename(source)
78-
source_path = os.path.abspath(
79-
os.path.join(urlparse(source).netloc, urlparse(source).path)
80-
)
81-
elif urlparse(source).scheme == "gs":
82-
# Google Cloud Storage path provided
83-
input_source_uri = urlparse(source)
84-
if "*" in source:
85-
# Wildcard path
86-
return _get_files(
87-
bucket=str(input_source_uri.hostname), uri=input_source_uri
75+
elif isinstance(source, str):
76+
source_uri = urlparse(source)
77+
if source_uri.scheme in ["", "file"]:
78+
# Local file provided as a source
79+
dir_path = ""
80+
file_name = os.path.basename(source)
81+
source_path = os.path.abspath(
82+
os.path.join(source_uri.netloc, source_uri.path)
8883
)
8984
else:
90-
return [source]
85+
# gs, s3 file provided as a source.
86+
return get_staging_client(source_uri.scheme).list_files(
87+
bucket=source_uri.hostname, path=source_uri.path
88+
)
9189
else:
9290
raise Exception(
9391
f"Only string and DataFrame types are allowed as a "
9492
f"source, {type(source)} was provided."
9593
)
9694

9795
# Push data to required staging location
98-
if uri.scheme == "gs":
99-
# Staging location is a Google Cloud Storage path
100-
upload_file_to_gcs(
101-
source_path, str(uri.hostname), str(uri.path).strip("/") + "/" + file_name
102-
)
103-
elif uri.scheme == "file":
104-
# Staging location is a file path
105-
# Used for end-to-end test
106-
pass
107-
else:
108-
raise Exception(
109-
f"Staging location {staging_location_uri} does not have a "
110-
f"valid URI. Only gs:// and file:// uri scheme are supported."
111-
)
96+
get_staging_client(uri.scheme).upload_file(
97+
source_path, uri.hostname, str(uri.path).strip("/") + "/" + file_name,
98+
)
11299

113100
# Clean up, remove local staging file
114-
if isinstance(source, pd.DataFrame) and len(str(dir_path)) > 4:
101+
if dir_path and isinstance(source, pd.DataFrame) and len(dir_path) > 4:
115102
shutil.rmtree(dir_path)
116103

117104
return [staging_location_uri.rstrip("/") + "/" + file_name]
@@ -162,70 +149,6 @@ def export_dataframe_to_local(
162149
return dir_path, file_name, dest_path
163150

164151

165-
def upload_file_to_gcs(local_path: str, bucket: str, remote_path: str) -> None:
166-
"""
167-
Upload a file from the local file system to Google Cloud Storage (GCS).
168-
169-
Args:
170-
local_path (str):
171-
Local filesystem path of file to upload.
172-
173-
bucket (str):
174-
GCS bucket destination to upload to.
175-
176-
remote_path (str):
177-
Path within GCS bucket to upload file to, includes file name.
178-
179-
Returns:
180-
None:
181-
None
182-
"""
183-
184-
storage_client = storage.Client(project=None)
185-
bucket_storage = storage_client.get_bucket(bucket)
186-
blob = bucket_storage.blob(remote_path)
187-
blob.upload_from_filename(local_path)
188-
189-
190-
def _get_files(bucket: str, uri: ParseResult) -> List[str]:
191-
"""
192-
List all available files within a Google storage bucket that matches a wild
193-
card path.
194-
195-
Args:
196-
bucket (str):
197-
Google Storage bucket to reference.
198-
199-
uri (urllib.parse.ParseResult):
200-
Wild card uri path containing the "*" character.
201-
Example:
202-
* gs://feast/staging_location/*
203-
* gs://feast/staging_location/file_*.avro
204-
205-
Returns:
206-
List[str]:
207-
List of all available files matching the wildcard path.
208-
"""
209-
210-
storage_client = storage.Client(project=None)
211-
bucket_storage = storage_client.get_bucket(bucket)
212-
path = uri.path
213-
214-
if "*" in path:
215-
regex = re.compile(path.replace("*", ".*?").strip("/"))
216-
blob_list = bucket_storage.list_blobs(
217-
prefix=path.strip("/").split("*")[0], delimiter="/"
218-
)
219-
# File path should not be in path (file path must be longer than path)
220-
return [
221-
f"{uri.scheme}://{uri.hostname}/{file}"
222-
for file in [x.name for x in blob_list]
223-
if re.match(regex, file) and file not in path
224-
]
225-
else:
226-
raise Exception(f"{path} is not a wildcard path")
227-
228-
229152
def _get_file_name() -> str:
230153
"""
231154
Create a random file name.

sdk/python/feast/staging/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)