Skip to content

Commit 99ff094

Browse files
authored
change azure https to wasbs and add azure creds to spark (feast-dev#1258)
* change azure https to wasbs and add azure creds to spark Signed-off-by: Jacob Klegar <jacob@tecton.ai> * lint Signed-off-by: Jacob Klegar <jacob@tecton.ai>
1 parent 1fdd44b commit 99ff094

File tree

5 files changed

+35
-13
lines changed

5 files changed

+35
-13
lines changed

.prow/config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ presubmits:
296296
- name: DOCKER_REPOSITORY
297297
value: gcr.io/kf-feast
298298
- name: STAGING_PATH
299-
value: https://feastcicd.blob.core.windows.net/staging/cicd-staging
299+
value: wasbs://staging@feastcicd.blob.core.windows.net/cicd-staging
300300
- name: AZ_SERVICE_PRINCIPAL_ID
301301
valueFrom:
302302
secretKeyRef:

sdk/python/feast/loaders/file.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def export_source_to_staging_location(
5353
Examples:
5454
* gs://bucket/path/
5555
* s3://bucket/path/
56-
* https://account_name.blob.core.windows.net/bucket/path/
56+
* wasbs://bucket@account_name.blob.core.windows.net/path/
5757
* file:///data/subfolder/
5858
5959
Returns:

sdk/python/feast/pyspark/launchers/k8s/k8s.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,20 @@ def _get_staging_client(self):
184184
uri = urlparse(self._staging_location)
185185
return get_staging_client(uri.scheme, self._config)
186186

187+
def _get_azure_credentials(self):
188+
uri = urlparse(self._staging_location)
189+
if uri.scheme != "wasbs":
190+
return {}
191+
account_name = self._config.get(opt.AZURE_BLOB_ACCOUNT_NAME)
192+
account_key = self._config.get(opt.AZURE_BLOB_ACCOUNT_ACCESS_KEY)
193+
if account_name is None or account_key is None:
194+
raise Exception(
195+
f"Using Azure blob storage requires {opt.AZURE_BLOB_ACCOUNT_NAME} and {opt.AZURE_BLOB_ACCOUNT_ACCESS_KEY} to be set in config"
196+
)
197+
return {
198+
f"spark.hadoop.fs.azure.account.key.{account_name}.blob.core.windows.net": f"{account_key}"
199+
}
200+
187201
def historical_feature_retrieval(
188202
self, job_params: RetrievalJobParameters
189203
) -> RetrievalJob:
@@ -221,6 +235,7 @@ def historical_feature_retrieval(
221235
packages=[],
222236
jars=[],
223237
extra_metadata={METADATA_OUTPUT_URI: job_params.get_destination_path()},
238+
azure_credentials=self._get_azure_credentials(),
224239
arguments=job_params.get_arguments(),
225240
namespace=self._namespace,
226241
)
@@ -275,6 +290,7 @@ def offline_to_online_ingestion(
275290
packages=[BQ_SPARK_PACKAGE],
276291
jars=[],
277292
extra_metadata={},
293+
azure_credentials=self._get_azure_credentials(),
278294
arguments=ingestion_job_params.get_arguments(),
279295
namespace=self._namespace,
280296
)
@@ -317,6 +333,7 @@ def start_stream_to_online_ingestion(
317333
packages=[BQ_SPARK_PACKAGE],
318334
jars=extra_jar_paths,
319335
extra_metadata={METADATA_JOBHASH: job_hash},
336+
azure_credentials=self._get_azure_credentials(),
320337
arguments=ingestion_job_params.get_arguments(),
321338
namespace=self._namespace,
322339
)

sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ def _prepare_job_resource(
111111
packages: List[str],
112112
jars: List[str],
113113
extra_metadata: Dict[str, str],
114+
azure_credentials: Dict[str, str],
114115
arguments: List[str],
115116
namespace: str,
116117
) -> Dict[str, Any]:
@@ -130,6 +131,7 @@ def _prepare_job_resource(
130131
_add_keys(job, ("spec",), dict(arguments=arguments))
131132

132133
_add_keys(job, ("spec", "sparkConf"), extra_metadata)
134+
_add_keys(job, ("spec", "sparkConf"), azure_credentials)
133135

134136
_append_items(job, ("spec", "deps", "packages"), packages)
135137
_append_items(job, ("spec", "deps", "jars"), jars)

sdk/python/feast/staging/storage_client.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
GS = "gs"
3131
S3 = "s3"
3232
S3A = "s3a"
33-
AZURE_SCHEME = "https"
33+
AZURE_SCHEME = "wasbs"
3434
LOCAL_FILE = "file"
3535

3636

@@ -326,17 +326,18 @@ def __init__(self, account_name: str, account_access_key: str):
326326
"Install package azure-storage-blob for azure blob staging support"
327327
"run ```pip install azure-storage-blob```"
328328
)
329-
self.account_url = f"https://{account_name}.blob.core.windows.net"
329+
self.account_name = account_name
330+
account_url = f"https://{account_name}.blob.core.windows.net"
330331
self.blob_service_client = BlobServiceClient(
331-
account_url=self.account_url, credential=account_access_key
332+
account_url=account_url, credential=account_access_key
332333
)
333334

334335
def download_file(self, uri: ParseResult) -> IO[bytes]:
335336
"""
336337
Downloads a file from Azure blob storage and returns a TemporaryFile object
337338
338339
Args:
339-
uri (urllib.parse.ParseResult): Parsed uri of the file ex: urlparse("https://account_name.blob.core.windows.net/bucket/file.avro")
340+
uri (urllib.parse.ParseResult): Parsed uri of the file ex: urlparse("wasbs://bucket@account_name.blob.core.windows.net/file.avro")
340341
341342
Returns:
342343
TemporaryFile object
@@ -366,17 +367,19 @@ def list_files(self, uri: ParseResult) -> List[str]:
366367
)
367368
# File path should not be in path (file path must be longer than path)
368369
return [
369-
f"{self.account_url}/{bucket}/{file}"
370+
f"wasbs://{bucket}@{self.account_name}.blob.core.windows.net/{file}"
370371
for file in [x.name for x in blob_list]
371372
if re.match(regex, file) and file not in path
372373
]
373374
else:
374-
return [f"{self.account_url}/{bucket}/{path}"]
375+
return [
376+
f"wasbs://{bucket}@{self.account_name}.blob.core.windows.net/{path}"
377+
]
375378

376379
def _uri_to_bucket_key(self, uri: ParseResult) -> Tuple[str, str]:
377-
assert uri.hostname == urlparse(self.account_url).hostname
378-
bucket = uri.path.lstrip("/").split("/")[0]
379-
key = uri.path.lstrip("/").split("/", 1)[1]
380+
assert uri.hostname == f"{self.account_name}.blob.core.windows.net"
381+
bucket = uri.username
382+
key = uri.path.lstrip("/")
380383
return bucket, key
381384

382385
def upload_fileobj(
@@ -485,7 +488,7 @@ def _local_fs_client(config: Config = None):
485488
GS: _gcs_client,
486489
S3: _s3_client,
487490
S3A: _s3a_client,
488-
AZURE_SCHEME: _azure_blob_client, # note we currently interpret all uris beginning https:// as Azure blob uris
491+
AZURE_SCHEME: _azure_blob_client,
489492
LOCAL_FILE: _local_fs_client,
490493
}
491494

@@ -505,5 +508,5 @@ def get_staging_client(scheme, config: Config = None) -> AbstractStagingClient:
505508
return storage_clients[scheme](config)
506509
except ValueError:
507510
raise Exception(
508-
f"Could not identify file scheme {scheme}. Only gs://, file://, s3:// and https:// (for Azure) are supported"
511+
f"Could not identify file scheme {scheme}. Only gs://, file://, s3:// and wasbs:// (for Azure) are supported"
509512
)

0 commit comments

Comments
 (0)