@@ -46,8 +46,8 @@ def pull_latest_from_table_or_query_ibis(
4646 created_timestamp_column : Optional [str ],
4747 start_date : datetime ,
4848 end_date : datetime ,
49- data_source_reader : Callable [[DataSource ], Table ],
50- data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
49+ data_source_reader : Callable [[DataSource , str ], Table ],
50+ data_source_writer : Callable [[pyarrow .Table , DataSource , str ], None ],
5151 staging_location : Optional [str ] = None ,
5252 staging_location_endpoint_override : Optional [str ] = None ,
5353) -> RetrievalJob :
@@ -57,7 +57,7 @@ def pull_latest_from_table_or_query_ibis(
5757 start_date = start_date .astimezone (tz = timezone .utc )
5858 end_date = end_date .astimezone (tz = timezone .utc )
5959
60- table = data_source_reader (data_source )
60+ table = data_source_reader (data_source , str ( config . repo_path ) )
6161
6262 table = table .select (* fields )
6363
@@ -87,6 +87,7 @@ def pull_latest_from_table_or_query_ibis(
8787 data_source_writer = data_source_writer ,
8888 staging_location = staging_location ,
8989 staging_location_endpoint_override = staging_location_endpoint_override ,
90+ repo_path = str (config .repo_path ),
9091 )
9192
9293
@@ -147,8 +148,8 @@ def get_historical_features_ibis(
147148 entity_df : Union [pd .DataFrame , str ],
148149 registry : BaseRegistry ,
149150 project : str ,
150- data_source_reader : Callable [[DataSource ], Table ],
151- data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
151+ data_source_reader : Callable [[DataSource , str ], Table ],
152+ data_source_writer : Callable [[pyarrow .Table , DataSource , str ], None ],
152153 full_feature_names : bool = False ,
153154 staging_location : Optional [str ] = None ,
154155 staging_location_endpoint_override : Optional [str ] = None ,
@@ -174,7 +175,9 @@ def get_historical_features_ibis(
174175 def read_fv (
175176 feature_view : FeatureView , feature_refs : List [str ], full_feature_names : bool
176177 ) -> Tuple :
177- fv_table : Table = data_source_reader (feature_view .batch_source )
178+ fv_table : Table = data_source_reader (
179+ feature_view .batch_source , str (config .repo_path )
180+ )
178181
179182 for old_name , new_name in feature_view .batch_source .field_mapping .items ():
180183 if old_name in fv_table .columns :
@@ -247,6 +250,7 @@ def read_fv(
247250 data_source_writer = data_source_writer ,
248251 staging_location = staging_location ,
249252 staging_location_endpoint_override = staging_location_endpoint_override ,
253+ repo_path = str (config .repo_path ),
250254 )
251255
252256
@@ -258,16 +262,16 @@ def pull_all_from_table_or_query_ibis(
258262 timestamp_field : str ,
259263 start_date : datetime ,
260264 end_date : datetime ,
261- data_source_reader : Callable [[DataSource ], Table ],
262- data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
265+ data_source_reader : Callable [[DataSource , str ], Table ],
266+ data_source_writer : Callable [[pyarrow .Table , DataSource , str ], None ],
263267 staging_location : Optional [str ] = None ,
264268 staging_location_endpoint_override : Optional [str ] = None ,
265269) -> RetrievalJob :
266270 fields = join_key_columns + feature_name_columns + [timestamp_field ]
267271 start_date = start_date .astimezone (tz = timezone .utc )
268272 end_date = end_date .astimezone (tz = timezone .utc )
269273
270- table = data_source_reader (data_source )
274+ table = data_source_reader (data_source , str ( config . repo_path ) )
271275
272276 table = table .select (* fields )
273277
@@ -290,6 +294,7 @@ def pull_all_from_table_or_query_ibis(
290294 data_source_writer = data_source_writer ,
291295 staging_location = staging_location ,
292296 staging_location_endpoint_override = staging_location_endpoint_override ,
297+ repo_path = str (config .repo_path ),
293298 )
294299
295300
@@ -319,7 +324,7 @@ def offline_write_batch_ibis(
319324 feature_view : FeatureView ,
320325 table : pyarrow .Table ,
321326 progress : Optional [Callable [[int ], Any ]],
322- data_source_writer : Callable [[pyarrow .Table , DataSource ], None ],
327+ data_source_writer : Callable [[pyarrow .Table , DataSource , str ], None ],
323328):
324329 pa_schema , column_names = get_pyarrow_schema_from_batch_source (
325330 config , feature_view .batch_source
@@ -330,7 +335,9 @@ def offline_write_batch_ibis(
330335 f"The schema is expected to be { pa_schema } with the columns (in this exact order) to be { column_names } ."
331336 )
332337
333- data_source_writer (ibis .memtable (table ), feature_view .batch_source )
338+ data_source_writer (
339+ ibis .memtable (table ), feature_view .batch_source , str (config .repo_path )
340+ )
334341
335342
336343def deduplicate (
@@ -469,6 +476,7 @@ def __init__(
469476 data_source_writer ,
470477 staging_location ,
471478 staging_location_endpoint_override ,
479+ repo_path ,
472480 ) -> None :
473481 super ().__init__ ()
474482 self .table = table
@@ -480,6 +488,7 @@ def __init__(
480488 self .data_source_writer = data_source_writer
481489 self .staging_location = staging_location
482490 self .staging_location_endpoint_override = staging_location_endpoint_override
491+ self .repo_path = repo_path
483492
484493 def _to_df_internal (self , timeout : Optional [int ] = None ) -> pd .DataFrame :
485494 return self .table .execute ()
@@ -502,7 +511,11 @@ def persist(
502511 timeout : Optional [int ] = None ,
503512 ):
504513 self .data_source_writer (
505- self .table , storage .to_data_source (), "overwrite" , allow_overwrite
514+ self .table ,
515+ storage .to_data_source (),
516+ self .repo_path ,
517+ "overwrite" ,
518+ allow_overwrite ,
506519 )
507520
508521 @property
0 commit comments