@@ -82,9 +82,8 @@ def online_write_batch(
8282 progress: Function to be called once a batch of rows is written to the online store, used
8383 to show progress.
8484 """
85- # update should have been called before
86- if self ._writer is None :
87- return
85+ self ._init_writer (config = config )
86+ assert self ._writer is not None
8887
8988 for entity_key , features , event_timestamp , _ in data :
9089 entity_id : str = compute_entity_id (
@@ -120,6 +119,8 @@ def online_read(
120119 item is the event timestamp for the row, and the second item is a dict mapping feature names
121120 to values, which are returned in proto format.
122121 """
122+ self ._init_reader (config = config )
123+
123124 if not len (entity_keys ):
124125 return []
125126
@@ -174,7 +175,6 @@ def _decode_fields_for_primary_key(
174175
175176 return dt , features
176177
177- # called before any read/write requests are issued
178178 @log_exceptions_and_usage (online_store = "ikv" )
179179 def update (
180180 self ,
@@ -199,7 +199,7 @@ def update(
199199 partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so
200200 infrastructure corresponding to other feature views should be not be touched.
201201 """
202- self ._init_clients (config = config )
202+ self ._init_writer (config = config )
203203 assert self ._writer is not None
204204
205205 # note: we assume tables_to_keep does not overlap with tables_to_delete
@@ -223,7 +223,7 @@ def teardown(
223223 tables: Feature views whose corresponding infrastructure should be deleted.
224224 entities: Entities whose corresponding infrastructure should be deleted.
225225 """
226- self ._init_clients (config = config )
226+ self ._init_writer (config = config )
227227 assert self ._writer is not None
228228
229229 # drop fields corresponding to this feature-view
@@ -269,20 +269,28 @@ def _create_document(
269269
270270 return builder .build ()
271271
272- def _init_clients (self , config : RepoConfig ):
273- """Initializes (if required) reader/writer ikv clients."""
274- online_config = config .online_store
275- assert isinstance (online_config , IKVOnlineStoreConfig )
276- client_options = IKVOnlineStore ._config_to_client_options (online_config )
277-
272+ def _init_writer (self , config : RepoConfig ):
273+ """Initializes ikv writer client."""
278274 # initialize writer
279275 if self ._writer is None :
276+ online_config = config .online_store
277+ assert isinstance (online_config , IKVOnlineStoreConfig )
278+ client_options = IKVOnlineStore ._config_to_client_options (online_config )
279+
280280 self ._writer = create_new_writer (client_options )
281+ self ._writer .startup () # blocking operation
281282
282- # initialize reader, iff mount_dir is specified
283+ def _init_reader (self , config : RepoConfig ):
284+ """Initializes ikv reader client."""
285+ # initialize reader
283286 if self ._reader is None :
287+ online_config = config .online_store
288+ assert isinstance (online_config , IKVOnlineStoreConfig )
289+ client_options = IKVOnlineStore ._config_to_client_options (online_config )
290+
284291 if online_config .mount_directory and len (online_config .mount_directory ) > 0 :
285292 self ._reader = create_new_reader (client_options )
293+ self ._reader .startup () # blocking operation
286294
287295 @staticmethod
288296 def _config_to_client_options (config : IKVOnlineStoreConfig ) -> ClientOptions :
0 commit comments