Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add faiss & in memory online store
Signed-off-by: cmuhao <sduxuhao@gmail.com>
  • Loading branch information
HaoXuAI committed Aug 29, 2024
commit 7d7a1c6320338dbd4dbbbfa34f54615843684b1a
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey
from feast.protos.feast.types.Value_pb2 import Value
from feast.repo_config import FeastConfigBaseModel
from feast.infra.key_encoding_utils import serialize_entity_key, deserialize_entity_key


class FaissOnlineStoreConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -99,10 +100,10 @@ def online_read(
if self._index is None:
return [(None, None)] * len(entity_keys)

results: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]] = []
results = []
for entity_key in entity_keys:
entity_key_tuple = tuple(entity_key.name, entity_key.join_keys)
idx = self._in_memory_store.entity_keys.get(entity_key_tuple, -1)
serialized_key = serialize_entity_key(entity_key, entity_key_serialization_version=2)
idx = self._in_memory_store.entity_keys.get(serialized_key, -1)
if idx == -1:
results.append((None, None))
else:
Expand All @@ -128,10 +129,10 @@ def online_write_batch(
return

feature_vectors = []
entity_key_tuples = []
serialized_keys = []

for entity_key, feature_dict, _, _ in data:
entity_key_tuple = (entity_key.name, entity_key.join_keys)
serialized_key = serialize_entity_key(entity_key, entity_key_serialization_version=2)
feature_vector = np.array(
[
feature_dict[name].double_val
Expand All @@ -141,12 +142,12 @@ def online_write_batch(
)

feature_vectors.append(feature_vector)
entity_key_tuples.append(entity_key_tuple)
serialized_keys.append(serialized_key)

feature_vectors_array = np.array(feature_vectors)

existing_indices = [
self._in_memory_store.entity_keys.get(ekt, -1) for ekt in entity_key_tuples
self._in_memory_store.entity_keys.get(sk, -1) for sk in serialized_keys
]
mask = np.array(existing_indices) != -1
if np.any(mask):
Expand All @@ -159,8 +160,8 @@ def online_write_batch(
)
self._index.add(feature_vectors_array)

for ekt, idx in zip(entity_key_tuples, new_indices):
self._in_memory_store.entity_keys[ekt] = idx
for sk, idx in zip(serialized_keys, new_indices):
self._in_memory_store.entity_keys[sk] = idx

if progress:
progress(len(data))
Expand Down