@@ -442,6 +442,7 @@ def persist(
442442 self .to_spark_df ().write .format (file_format ).saveAsTable (table_name )
443443 else :
444444 self .to_spark_df ().createOrReplaceTempView (table_name )
445+
445446
446447 def _has_remote_warehouse_in_config (self ) -> bool :
447448 """
@@ -497,15 +498,35 @@ def to_remote_storage(self) -> List[str]:
497498 return aws_utils .list_s3_files (
498499 self ._config .offline_store .region , output_uri
499500 )
500-
501+ elif self ._config .offline_store .staging_location .startswith ("hdfs://" ):
502+ output_uri = os .path .join (
503+ self ._config .offline_store .staging_location , str (uuid .uuid4 ())
504+ )
505+ sdf .write .parquet (output_uri )
506+ spark_session = get_spark_session_or_start_new_with_repoconfig (
507+ store_config = self ._config .offline_store
508+ )
509+ return self ._list_hdfs_files (spark_session , output_uri )
501510 else :
502511 raise NotImplementedError (
503- "to_remote_storage is only implemented for file:// and s3 :// uri schemes"
512+ "to_remote_storage is only implemented for file://, s3:// and hdfs :// uri schemes"
504513 )
505514
506515 else :
507516 raise NotImplementedError ()
508517
518+ def _list_hdfs_files (self , spark_session : SparkSession , uri : str ) -> List [str ]:
519+ jvm = spark_session ._jvm
520+ conf = spark_session ._jsc .hadoopConfiguration ()
521+ path = jvm .org .apache .hadoop .fs .Path (uri )
522+ fs = jvm .org .apache .hadoop .fs .FileSystem .get (path .toUri (), conf )
523+ statuses = fs .listStatus (path )
524+ files = []
525+ for f in statuses :
526+ if f .isFile ():
527+ files .append (f .getPath ().toString ())
528+ return files
529+
509530 @property
510531 def metadata (self ) -> Optional [RetrievalMetadata ]:
511532 """
0 commit comments