Skip to content

Commit 0e42150

Browse files
authored
feat: Feast/IKV upgrade client version (feast-dev#4200)
1 parent bf99640 commit 0e42150

2 files changed

Lines changed: 22 additions & 14 deletions

File tree

  • sdk/python/feast/infra/online_stores/contrib/ikv_online_store

sdk/python/feast/infra/online_stores/contrib/ikv_online_store/ikv.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,8 @@ def online_write_batch(
8282
progress: Function to be called once a batch of rows is written to the online store, used
8383
to show progress.
8484
"""
85-
# update should have been called before
86-
if self._writer is None:
87-
return
85+
self._init_writer(config=config)
86+
assert self._writer is not None
8887

8988
for entity_key, features, event_timestamp, _ in data:
9089
entity_id: str = compute_entity_id(
@@ -120,6 +119,8 @@ def online_read(
120119
item is the event timestamp for the row, and the second item is a dict mapping feature names
121120
to values, which are returned in proto format.
122121
"""
122+
self._init_reader(config=config)
123+
123124
if not len(entity_keys):
124125
return []
125126

@@ -174,7 +175,6 @@ def _decode_fields_for_primary_key(
174175

175176
return dt, features
176177

177-
# called before any read/write requests are issued
178178
@log_exceptions_and_usage(online_store="ikv")
179179
def update(
180180
self,
@@ -199,7 +199,7 @@ def update(
199199
partial: If true, tables_to_delete and tables_to_keep are not exhaustive lists, so
200200
infrastructure corresponding to other feature views should be not be touched.
201201
"""
202-
self._init_clients(config=config)
202+
self._init_writer(config=config)
203203
assert self._writer is not None
204204

205205
# note: we assume tables_to_keep does not overlap with tables_to_delete
@@ -223,7 +223,7 @@ def teardown(
223223
tables: Feature views whose corresponding infrastructure should be deleted.
224224
entities: Entities whose corresponding infrastructure should be deleted.
225225
"""
226-
self._init_clients(config=config)
226+
self._init_writer(config=config)
227227
assert self._writer is not None
228228

229229
# drop fields corresponding to this feature-view
@@ -269,20 +269,28 @@ def _create_document(
269269

270270
return builder.build()
271271

272-
def _init_clients(self, config: RepoConfig):
273-
"""Initializes (if required) reader/writer ikv clients."""
274-
online_config = config.online_store
275-
assert isinstance(online_config, IKVOnlineStoreConfig)
276-
client_options = IKVOnlineStore._config_to_client_options(online_config)
277-
272+
def _init_writer(self, config: RepoConfig):
273+
"""Initializes ikv writer client."""
278274
# initialize writer
279275
if self._writer is None:
276+
online_config = config.online_store
277+
assert isinstance(online_config, IKVOnlineStoreConfig)
278+
client_options = IKVOnlineStore._config_to_client_options(online_config)
279+
280280
self._writer = create_new_writer(client_options)
281+
self._writer.startup() # blocking operation
281282

282-
# initialize reader, iff mount_dir is specified
283+
def _init_reader(self, config: RepoConfig):
284+
"""Initializes ikv reader client."""
285+
# initialize reader
283286
if self._reader is None:
287+
online_config = config.online_store
288+
assert isinstance(online_config, IKVOnlineStoreConfig)
289+
client_options = IKVOnlineStore._config_to_client_options(online_config)
290+
284291
if online_config.mount_directory and len(online_config.mount_directory) > 0:
285292
self._reader = create_new_reader(client_options)
293+
self._reader.startup() # blocking operation
286294

287295
@staticmethod
288296
def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions:

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@
127127
]
128128

129129
IKV_REQUIRED = [
130-
"ikvpy>=0.0.23",
130+
"ikvpy>=0.0.36",
131131
]
132132

133133
HAZELCAST_REQUIRED = [

0 commit comments

Comments
 (0)