Skip to content

Commit aed366b

Browse files
authored
S3 endpoint configuration feast-dev#1169 (feast-dev#1172)
* S3 endpoint configuration feast-dev#1169 Signed-off-by: mike0sv <mike0sv@gmail.com> * Add allow_no_value=True to ConfigParser Signed-off-by: mike0sv <mike0sv@gmail.com> * New constants API defaults extraction Signed-off-by: mike0sv <mike0sv@gmail.com> * fix for other types of get Signed-off-by: mike0sv <mike0sv@gmail.com> * return to the old logic and some testing Signed-off-by: mike0sv <mike0sv@gmail.com> * oooopsie Signed-off-by: mike0sv <mike0sv@gmail.com> * remove DEFAULTS logic changes Signed-off-by: mike0sv <mike0sv@gmail.com> * reformat Signed-off-by: mike0sv <mike0sv@gmail.com> * _upload_to_file_source docs Signed-off-by: mike0sv <mike0sv@gmail.com>
1 parent faa08fe commit aed366b

File tree

5 files changed

+41
-10
lines changed

5 files changed

+41
-10
lines changed

sdk/python/feast/client.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,9 @@ def ingest(
849849
try:
850850
if issubclass(type(feature_table.batch_source), FileSource):
851851
file_url = feature_table.batch_source.file_options.file_url.rstrip("*")
852-
_upload_to_file_source(file_url, with_partitions, dest_path)
852+
_upload_to_file_source(
853+
file_url, with_partitions, dest_path, self._config
854+
)
853855
if issubclass(type(feature_table.batch_source), BigQuerySource):
854856
bq_table_ref = feature_table.batch_source.bigquery_options.table_ref
855857
feature_table_timestamp_column = (
@@ -1004,6 +1006,7 @@ def get_historical_features(
10041006
entity_source = stage_entities_to_fs(
10051007
entity_source,
10061008
staging_location=self._config.get(opt.SPARK_STAGING_LOCATION),
1009+
config=self._config,
10071010
)
10081011

10091012
if self._use_job_service:

sdk/python/feast/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ class ConfigOptions(metaclass=ConfigMeta):
125125
#: Time to wait for historical feature requests before timing out.
126126
BATCH_FEATURE_REQUEST_WAIT_TIME_SECONDS: str = "600"
127127

128+
#: Endpoint URL for S3 storage_client
129+
S3_ENDPOINT_URL: Optional[str] = None
130+
128131
#: Authentication Provider - Google OpenID/OAuth
129132
#:
130133
#: Options: "google" / "oauth"

sdk/python/feast/loaders/ingest.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import pyarrow as pa
1010
from pyarrow import parquet as pq
1111

12+
from feast.config import Config
1213
from feast.staging.storage_client import get_staging_client
1314

1415

@@ -166,18 +167,21 @@ def _read_table_from_source(
166167

167168

168169
def _upload_to_file_source(
169-
file_url: str, with_partitions: bool, dest_path: str
170+
file_url: str, with_partitions: bool, dest_path: str, config: Config
170171
) -> None:
171172
"""
172173
Uploads data into a FileSource. Currently supports GCS, S3 and Local FS.
173174
174175
Args:
175176
file_url: file url of FileSource defined for FeatureTable
177+
with_partitions: whether to treat dest_path as dir with partitioned table
178+
dest_path: path to file or dir to be uploaded
179+
config: Config instance to configure FileSource
176180
"""
177181
from urllib.parse import urlparse
178182

179183
uri = urlparse(file_url)
180-
staging_client = get_staging_client(uri.scheme)
184+
staging_client = get_staging_client(uri.scheme, config)
181185

182186
if with_partitions:
183187
for path in glob.glob(os.path.join(dest_path, "**/*")):

sdk/python/feast/staging/entities.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import pandas as pd
99

10+
from feast.config import Config
1011
from feast.data_format import ParquetFormat
1112
from feast.data_source import BigQuerySource, FileSource
1213
from feast.staging.storage_client import get_staging_client
@@ -18,15 +19,15 @@
1819

1920

2021
def stage_entities_to_fs(
21-
entity_source: pd.DataFrame, staging_location: str
22+
entity_source: pd.DataFrame, staging_location: str, config: Config
2223
) -> FileSource:
2324
"""
2425
Dumps given (entities) dataframe as parquet file and stage it to remote file storage (subdirectory of staging_location)
2526
2627
:return: FileSource with remote destination path
2728
"""
2829
entity_staging_uri = urlparse(os.path.join(staging_location, str(uuid.uuid4())))
29-
staging_client = get_staging_client(entity_staging_uri.scheme)
30+
staging_client = get_staging_client(entity_staging_uri.scheme, config)
3031
with tempfile.NamedTemporaryFile() as df_export_path:
3132
entity_source.to_parquet(df_export_path.name)
3233
bucket = (

sdk/python/feast/staging/storage_client.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424

2525
from google.auth.exceptions import DefaultCredentialsError
2626

27+
from feast.config import Config
28+
from feast.constants import ConfigOptions as opt
29+
2730
GS = "gs"
2831
S3 = "s3"
2932
LOCAL_FILE = "file"
@@ -144,15 +147,15 @@ class S3Client(AbstractStagingClient):
144147
Implementation of AbstractStagingClient for Aws S3 storage
145148
"""
146149

147-
def __init__(self):
150+
def __init__(self, endpoint_url: str = None):
148151
try:
149152
import boto3
150153
except ImportError:
151154
raise ImportError(
152155
"Install package boto3 for s3 staging support"
153156
"run ```pip install boto3```"
154157
)
155-
self.s3_client = boto3.client("s3")
158+
self.s3_client = boto3.client("s3", endpoint_url=endpoint_url)
156159

157160
def download_file(self, uri: ParseResult) -> IO[bytes]:
158161
"""
@@ -275,21 +278,38 @@ def upload_file(self, local_path: str, bucket: str, remote_path: str):
275278
shutil.copy(local_path, dest_fpath)
276279

277280

278-
storage_clients = {GS: GCSClient, S3: S3Client, LOCAL_FILE: LocalFSClient}
281+
def _s3_client(config: Config = None):
282+
if config is None:
283+
endpoint_url = None
284+
else:
285+
endpoint_url = config.get(opt.S3_ENDPOINT_URL, None)
286+
return S3Client(endpoint_url=endpoint_url)
287+
288+
289+
def _gcs_client(config: Config = None):
290+
return GCSClient()
291+
292+
293+
def _local_fs_client(config: Config = None):
294+
return LocalFSClient()
295+
296+
297+
storage_clients = {GS: _gcs_client, S3: _s3_client, LOCAL_FILE: _local_fs_client}
279298

280299

281-
def get_staging_client(scheme):
300+
def get_staging_client(scheme, config: Config = None):
282301
"""
283302
Initialization of a specific client object(GCSClient, S3Client etc.)
284303
285304
Args:
286305
scheme (str): uri scheme: s3, gs or file
306+
config (Config): additional configuration
287307
288308
Returns:
289309
An object of concrete implementation of AbstractStagingClient
290310
"""
291311
try:
292-
return storage_clients[scheme]()
312+
return storage_clients[scheme](config)
293313
except ValueError:
294314
raise Exception(
295315
f"Could not identify file scheme {scheme}. Only gs://, file:// and s3:// are supported"

0 commit comments

Comments
 (0)