Skip to content

Commit 583275f

Browse files
committed
feat: support hdfs staging for Spark offline store
1 parent eb51f00 commit 583275f

File tree

1 file changed

+23
-2
lines changed
  • sdk/python/feast/infra/offline_stores/contrib/spark_offline_store

1 file changed

+23
-2
lines changed

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,7 @@ def persist(
442442
self.to_spark_df().write.format(file_format).saveAsTable(table_name)
443443
else:
444444
self.to_spark_df().createOrReplaceTempView(table_name)
445+
445446

446447
def _has_remote_warehouse_in_config(self) -> bool:
447448
"""
@@ -497,15 +498,35 @@ def to_remote_storage(self) -> List[str]:
497498
return aws_utils.list_s3_files(
498499
self._config.offline_store.region, output_uri
499500
)
500-
501+
elif self._config.offline_store.staging_location.startswith("hdfs://"):
502+
output_uri = os.path.join(
503+
self._config.offline_store.staging_location, str(uuid.uuid4())
504+
)
505+
sdf.write.parquet(output_uri)
506+
spark_session = get_spark_session_or_start_new_with_repoconfig(
507+
store_config=self._config.offline_store
508+
)
509+
return self._list_hdfs_files(spark_session, output_uri)
501510
else:
502511
raise NotImplementedError(
503-
"to_remote_storage is only implemented for file:// and s3:// uri schemes"
512+
"to_remote_storage is only implemented for file://, s3:// and hdfs:// uri schemes"
504513
)
505514

506515
else:
507516
raise NotImplementedError()
508517

518+
def _list_hdfs_files(self, spark_session: SparkSession, uri: str) -> List[str]:
519+
jvm = spark_session._jvm
520+
conf = spark_session._jsc.hadoopConfiguration()
521+
path = jvm.org.apache.hadoop.fs.Path(uri)
522+
fs = jvm.org.apache.hadoop.fs.FileSystem.get(path.toUri(), conf)
523+
statuses = fs.listStatus(path)
524+
files = []
525+
for f in statuses:
526+
if f.isFile():
527+
files.append(f.getPath().toString())
528+
return files
529+
509530
@property
510531
def metadata(self) -> Optional[RetrievalMetadata]:
511532
"""

0 commit comments

Comments
 (0)