|
24 | 24 |
|
25 | 25 | from google.auth.exceptions import DefaultCredentialsError |
26 | 26 |
|
| 27 | +from feast.config import Config |
| 28 | +from feast.constants import ConfigOptions as opt |
| 29 | + |
27 | 30 | GS = "gs" |
28 | 31 | S3 = "s3" |
29 | 32 | LOCAL_FILE = "file" |
@@ -144,15 +147,15 @@ class S3Client(AbstractStagingClient): |
144 | 147 | Implementation of AbstractStagingClient for Aws S3 storage |
145 | 148 | """ |
146 | 149 |
|
147 | | - def __init__(self): |
| 150 | + def __init__(self, endpoint_url: str = None): |
148 | 151 | try: |
149 | 152 | import boto3 |
150 | 153 | except ImportError: |
151 | 154 | raise ImportError( |
152 | 155 | "Install package boto3 for s3 staging support" |
153 | 156 | "run ```pip install boto3```" |
154 | 157 | ) |
155 | | - self.s3_client = boto3.client("s3") |
| 158 | + self.s3_client = boto3.client("s3", endpoint_url=endpoint_url) |
156 | 159 |
|
157 | 160 | def download_file(self, uri: ParseResult) -> IO[bytes]: |
158 | 161 | """ |
@@ -275,21 +278,38 @@ def upload_file(self, local_path: str, bucket: str, remote_path: str): |
275 | 278 | shutil.copy(local_path, dest_fpath) |
276 | 279 |
|
277 | 280 |
|
278 | | -storage_clients = {GS: GCSClient, S3: S3Client, LOCAL_FILE: LocalFSClient} |
| 281 | +def _s3_client(config: Config = None): |
| 282 | + if config is None: |
| 283 | + endpoint_url = None |
| 284 | + else: |
| 285 | + endpoint_url = config.get(opt.S3_ENDPOINT_URL, None) |
| 286 | + return S3Client(endpoint_url=endpoint_url) |
| 287 | + |
| 288 | + |
| 289 | +def _gcs_client(config: Config = None): |
| 290 | + return GCSClient() |
| 291 | + |
| 292 | + |
| 293 | +def _local_fs_client(config: Config = None): |
| 294 | + return LocalFSClient() |
| 295 | + |
| 296 | + |
| 297 | +storage_clients = {GS: _gcs_client, S3: _s3_client, LOCAL_FILE: _local_fs_client} |
279 | 298 |
|
280 | 299 |
|
281 | | -def get_staging_client(scheme): |
| 300 | +def get_staging_client(scheme, config: Config = None): |
282 | 301 | """ |
283 | 302 | Initialization of a specific client object(GCSClient, S3Client etc.) |
284 | 303 |
|
285 | 304 | Args: |
286 | 305 | scheme (str): uri scheme: s3, gs or file |
| 306 | + config (Config): additional configuration |
287 | 307 |
|
288 | 308 | Returns: |
289 | 309 | An object of concrete implementation of AbstractStagingClient |
290 | 310 | """ |
291 | 311 | try: |
292 | | - return storage_clients[scheme]() |
| 312 | + return storage_clients[scheme](config) |
293 | 313 | except ValueError: |
294 | 314 | raise Exception( |
295 | 315 | f"Could not identify file scheme {scheme}. Only gs://, file:// and s3:// are supported" |
|
0 commit comments