Skip to content

Commit c63b037

Browse files
authored
Introduce an OnlineStore interface (#1628)
* Introduce an OnlineStore interface Signed-off-by: Achal Shah <achals@gmail.com> * format and lint Signed-off-by: Achal Shah <achals@gmail.com> * refactor redis into its own class too Signed-off-by: Achal Shah <achals@gmail.com> * tests and lint Signed-off-by: Achal Shah <achals@gmail.com> * remove import Signed-off-by: Achal Shah <achals@gmail.com> * add docs, refactor to add a setup and teardown to the online store and invoke it from the providers Signed-off-by: Achal Shah <achals@gmail.com> * more simplifications Signed-off-by: Achal Shah <achals@gmail.com> * make format Signed-off-by: Achal Shah <achals@gmail.com> * make lint Signed-off-by: Achal Shah <achals@gmail.com> * Add redis write path Signed-off-by: Achal Shah <achals@gmail.com> * Make instance methods instead of classmethods, and cache clients/connections Signed-off-by: Achal Shah <achals@gmail.com> * make format Signed-off-by: Achal Shah <achals@gmail.com> * Use update instead of setup Signed-off-by: Achal Shah <achals@gmail.com>
1 parent a3d7c86 commit c63b037

19 files changed

+892
-580
lines changed

sdk/python/feast/errors.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ def __init__(self, offline_store_name: str, data_source_name: str):
7171
)
7272

7373

74+
class FeastOnlineStoreUnsupportedDataSource(Exception):
75+
def __init__(self, online_store_name: str, data_source_name: str):
76+
super().__init__(
77+
f"Online Store '{online_store_name}' does not support data source '{data_source_name}'"
78+
)
79+
80+
7481
class FeastEntityDFMissingColumnsError(Exception):
7582
def __init__(self, expected, missing):
7683
super().__init__(

sdk/python/feast/feature_store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ def get_online_features(
541541
table, union_of_entity_keys, entity_name_to_join_key_map
542542
)
543543
read_rows = provider.online_read(
544-
project=self.project,
544+
config=self.config,
545545
table=table,
546546
entity_keys=entity_keys,
547547
requested_features=requested_features,

sdk/python/feast/infra/gcp.py

Lines changed: 20 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
1-
import itertools
21
from datetime import datetime
3-
from multiprocessing.pool import ThreadPool
4-
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union
2+
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
53

6-
import mmh3
74
import pandas
85
from tqdm import tqdm
96

10-
from feast import FeatureTable, utils
7+
from feast import FeatureTable
118
from feast.entity import Entity
12-
from feast.errors import FeastProviderLoginError
139
from feast.feature_view import FeatureView
14-
from feast.infra.key_encoding_utils import serialize_entity_key
1510
from feast.infra.offline_stores.helpers import get_offline_store_from_config
11+
from feast.infra.online_stores.helpers import get_online_store_from_config
1612
from feast.infra.provider import (
1713
Provider,
1814
RetrievalJob,
@@ -23,42 +19,17 @@
2319
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
2420
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
2521
from feast.registry import Registry
26-
from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig
27-
28-
try:
29-
from google.auth.exceptions import DefaultCredentialsError
30-
from google.cloud import datastore
31-
except ImportError as e:
32-
from feast.errors import FeastExtrasDependencyImportError
33-
34-
raise FeastExtrasDependencyImportError("gcp", str(e))
22+
from feast.repo_config import RepoConfig
3523

3624

3725
class GcpProvider(Provider):
3826
_gcp_project_id: Optional[str]
3927
_namespace: Optional[str]
4028

4129
def __init__(self, config: RepoConfig):
42-
assert isinstance(config.online_store, DatastoreOnlineStoreConfig)
43-
self._gcp_project_id = config.online_store.project_id
44-
self._namespace = config.online_store.namespace
45-
self._write_concurrency = config.online_store.write_concurrency
46-
self._write_batch_size = config.online_store.write_batch_size
47-
48-
assert config.offline_store is not None
30+
self.repo_config = config
4931
self.offline_store = get_offline_store_from_config(config.offline_store)
50-
51-
def _initialize_client(self):
52-
try:
53-
return datastore.Client(
54-
project=self._gcp_project_id, namespace=self._namespace
55-
)
56-
except DefaultCredentialsError as e:
57-
raise FeastProviderLoginError(
58-
str(e)
59-
+ '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your '
60-
"local Google Cloud account "
61-
)
32+
self.online_store = get_online_store_from_config(config.online_store)
6233

6334
def update_infra(
6435
self,
@@ -69,85 +40,43 @@ def update_infra(
6940
entities_to_keep: Sequence[Entity],
7041
partial: bool,
7142
):
72-
73-
client = self._initialize_client()
74-
75-
for table in tables_to_keep:
76-
key = client.key("Project", project, "Table", table.name)
77-
entity = datastore.Entity(
78-
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
79-
)
80-
entity.update({"created_ts": datetime.utcnow()})
81-
client.put(entity)
82-
83-
for table in tables_to_delete:
84-
_delete_all_values(
85-
client, client.key("Project", project, "Table", table.name)
86-
)
87-
88-
# Delete the table metadata datastore entity
89-
key = client.key("Project", project, "Table", table.name)
90-
client.delete(key)
43+
self.online_store.update(
44+
config=self.repo_config,
45+
tables_to_delete=tables_to_delete,
46+
tables_to_keep=tables_to_keep,
47+
entities_to_keep=entities_to_keep,
48+
entities_to_delete=entities_to_delete,
49+
partial=partial,
50+
)
9151

9252
def teardown_infra(
9353
self,
9454
project: str,
9555
tables: Sequence[Union[FeatureTable, FeatureView]],
9656
entities: Sequence[Entity],
9757
) -> None:
98-
client = self._initialize_client()
99-
100-
for table in tables:
101-
_delete_all_values(
102-
client, client.key("Project", project, "Table", table.name)
103-
)
104-
105-
# Delete the table metadata datastore entity
106-
key = client.key("Project", project, "Table", table.name)
107-
client.delete(key)
58+
self.online_store.teardown(self.repo_config, tables, entities)
10859

10960
def online_write_batch(
11061
self,
111-
project: str,
62+
config: RepoConfig,
11263
table: Union[FeatureTable, FeatureView],
11364
data: List[
11465
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
11566
],
11667
progress: Optional[Callable[[int], Any]],
11768
) -> None:
118-
client = self._initialize_client()
119-
120-
pool = ThreadPool(processes=self._write_concurrency)
121-
pool.map(
122-
lambda b: _write_minibatch(client, project, table, b, progress),
123-
_to_minibatches(data, batch_size=self._write_batch_size),
124-
)
69+
self.online_store.online_write_batch(config, table, data, progress)
12570

12671
def online_read(
12772
self,
128-
project: str,
73+
config: RepoConfig,
12974
table: Union[FeatureTable, FeatureView],
13075
entity_keys: List[EntityKeyProto],
13176
requested_features: List[str] = None,
13277
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
133-
client = self._initialize_client()
78+
result = self.online_store.online_read(config, table, entity_keys)
13479

135-
result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = []
136-
for entity_key in entity_keys:
137-
document_id = compute_datastore_entity_id(entity_key)
138-
key = client.key(
139-
"Project", project, "Table", table.name, "Row", document_id
140-
)
141-
value = client.get(key)
142-
if value is not None:
143-
res = {}
144-
for feature_name, value_bin in value["values"].items():
145-
val = ValueProto()
146-
val.ParseFromString(value_bin)
147-
res[feature_name] = val
148-
result.append((value["event_ts"], res))
149-
else:
150-
result.append((None, None))
15180
return result
15281

15382
def materialize_single_feature_view(
@@ -188,7 +117,7 @@ def materialize_single_feature_view(
188117

189118
with tqdm_builder(len(rows_to_write)) as pbar:
190119
self.online_write_batch(
191-
project, feature_view, rows_to_write, lambda x: pbar.update(x)
120+
self.repo_config, feature_view, rows_to_write, lambda x: pbar.update(x)
192121
)
193122

194123
def get_historical_features(
@@ -209,84 +138,3 @@ def get_historical_features(
209138
project=project,
210139
)
211140
return job
212-
213-
214-
ProtoBatch = Sequence[
215-
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
216-
]
217-
218-
219-
def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]:
220-
"""
221-
Split data into minibatches, making sure we stay under GCP datastore transaction size
222-
limits.
223-
"""
224-
iterable = iter(data)
225-
226-
while True:
227-
batch = list(itertools.islice(iterable, batch_size))
228-
if len(batch) > 0:
229-
yield batch
230-
else:
231-
break
232-
233-
234-
def _write_minibatch(
235-
client,
236-
project: str,
237-
table: Union[FeatureTable, FeatureView],
238-
data: Sequence[
239-
Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]
240-
],
241-
progress: Optional[Callable[[int], Any]],
242-
):
243-
entities = []
244-
for entity_key, features, timestamp, created_ts in data:
245-
document_id = compute_datastore_entity_id(entity_key)
246-
247-
key = client.key("Project", project, "Table", table.name, "Row", document_id,)
248-
249-
entity = datastore.Entity(
250-
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
251-
)
252-
253-
entity.update(
254-
dict(
255-
key=entity_key.SerializeToString(),
256-
values={k: v.SerializeToString() for k, v in features.items()},
257-
event_ts=utils.make_tzaware(timestamp),
258-
created_ts=(
259-
utils.make_tzaware(created_ts) if created_ts is not None else None
260-
),
261-
)
262-
)
263-
entities.append(entity)
264-
with client.transaction():
265-
client.put_multi(entities)
266-
267-
if progress:
268-
progress(len(entities))
269-
270-
271-
def _delete_all_values(client, key) -> None:
272-
"""
273-
Delete all data under the key path in datastore.
274-
"""
275-
while True:
276-
query = client.query(kind="Row", ancestor=key)
277-
entities = list(query.fetch(limit=1000))
278-
if not entities:
279-
return
280-
281-
for entity in entities:
282-
client.delete(entity.key)
283-
284-
285-
def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
286-
"""
287-
Compute Datastore Entity id given Feast Entity Key.
288-
289-
Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
290-
do with the Entity concept we have in Feast.
291-
"""
292-
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()

0 commit comments

Comments
 (0)