@@ -47,6 +47,8 @@ def pull_latest_from_table_or_query_ibis(
4747 end_date : datetime ,
4848 data_source_reader : Callable [[DataSource ], Table ],
4949 data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
50+ staging_location : Optional [str ] = None ,
51+ staging_location_endpoint_override : Optional [str ] = None ,
5052) -> RetrievalJob :
5153 fields = join_key_columns + feature_name_columns + [timestamp_field ]
5254 if created_timestamp_column :
@@ -82,6 +84,8 @@ def pull_latest_from_table_or_query_ibis(
8284 full_feature_names = False ,
8385 metadata = None ,
8486 data_source_writer = data_source_writer ,
87+ staging_location = staging_location ,
88+ staging_location_endpoint_override = staging_location_endpoint_override ,
8589 )
8690
8791
@@ -140,6 +144,8 @@ def get_historical_features_ibis(
140144 data_source_reader : Callable [[DataSource ], Table ],
141145 data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
142146 full_feature_names : bool = False ,
147+ staging_location : Optional [str ] = None ,
148+ staging_location_endpoint_override : Optional [str ] = None ,
143149) -> RetrievalJob :
144150 entity_schema = _get_entity_schema (
145151 entity_df = entity_df ,
@@ -231,6 +237,8 @@ def read_fv(
231237 max_event_timestamp = timestamp_range [1 ],
232238 ),
233239 data_source_writer = data_source_writer ,
240+ staging_location = staging_location ,
241+ staging_location_endpoint_override = staging_location_endpoint_override ,
234242 )
235243
236244
@@ -244,6 +252,8 @@ def pull_all_from_table_or_query_ibis(
244252 end_date : datetime ,
245253 data_source_reader : Callable [[DataSource ], Table ],
246254 data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
255+ staging_location : Optional [str ] = None ,
256+ staging_location_endpoint_override : Optional [str ] = None ,
247257) -> RetrievalJob :
248258 fields = join_key_columns + feature_name_columns + [timestamp_field ]
249259 start_date = start_date .astimezone (tz = utc )
@@ -270,6 +280,8 @@ def pull_all_from_table_or_query_ibis(
270280 full_feature_names = False ,
271281 metadata = None ,
272282 data_source_writer = data_source_writer ,
283+ staging_location = staging_location ,
284+ staging_location_endpoint_override = staging_location_endpoint_override ,
273285 )
274286
275287
@@ -411,6 +423,23 @@ def point_in_time_join(
411423 return acc_table
412424
413425
426+ def list_s3_files (path : str , endpoint_url : str ) -> List [str ]:
427+ import boto3
428+
429+ s3 = boto3 .client ("s3" , endpoint_url = endpoint_url )
430+ if path .startswith ("s3://" ):
431+ path = path [len ("s3://" ) :]
432+ bucket , prefix = path .split ("/" , 1 )
433+ objects = s3 .list_objects_v2 (Bucket = bucket , Prefix = prefix )
434+ contents = objects ["Contents" ]
435+ files = [
436+ f"s3://{ bucket } /{ content ['Key' ]} "
437+ for content in contents
438+ if content ["Key" ].endswith ("parquet" )
439+ ]
440+ return files
441+
442+
414443class IbisRetrievalJob (RetrievalJob ):
415444 def __init__ (
416445 self ,
@@ -419,6 +448,8 @@ def __init__(
419448 full_feature_names ,
420449 metadata ,
421450 data_source_writer ,
451+ staging_location ,
452+ staging_location_endpoint_override ,
422453 ) -> None :
423454 super ().__init__ ()
424455 self .table = table
@@ -428,6 +459,8 @@ def __init__(
428459 self ._full_feature_names = full_feature_names
429460 self ._metadata = metadata
430461 self .data_source_writer = data_source_writer
462+ self .staging_location = staging_location
463+ self .staging_location_endpoint_override = staging_location_endpoint_override
431464
432465 def _to_df_internal (self , timeout : Optional [int ] = None ) -> pd .DataFrame :
433466 return self .table .execute ()
@@ -456,3 +489,15 @@ def persist(
456489 @property
457490 def metadata (self ) -> Optional [RetrievalMetadata ]:
458491 return self ._metadata
492+
493+ def supports_remote_storage_export (self ) -> bool :
494+ return self .staging_location is not None
495+
496+ def to_remote_storage (self ) -> List [str ]:
497+ path = self .staging_location + f"/{ str (uuid .uuid4 ())} "
498+
499+ storage_options = {"AWS_ENDPOINT_URL" : self .staging_location_endpoint_override }
500+
501+ self .table .to_delta (path , storage_options = storage_options )
502+
503+ return list_s3_files (path , self .staging_location_endpoint_override )
0 commit comments