Skip to content

Commit 09f4208

Browse files
authored
Read and write path for Datastore and SQLite (feast-dev#1376)
* local read-write path Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> * commas Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> * bump python version Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> * fix dockerfile Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> * fixing tests Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com> * don't use fancy sqlite upsert as it is not in python 3.7.5 Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
1 parent 53ebe47 commit 09f4208

16 files changed

+422
-43
lines changed
1.92 KB
Binary file not shown.
106 KB
Loading

docs/specs/online_store_format.md

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,38 +59,38 @@ Here's an example of how the entire thing looks like:
5959

6060
However, we'll address this issue in future versions of the protocol.
6161

62-
## Cloud Firestore Online Store Format
62+
## Google Datastore Online Store Format
6363

64-
[Firebase data model](https://firebase.google.com/docs/firestore/data-model) is a hierarchy of documents that can contain (sub)-collections. This structure can be multiple levels deep; documents and subcollections are alternating in this hierarchy.
64+
[Datastore data model](https://cloud.google.com/datastore/docs/concepts/entities) is a collection of documents called Entities (not to be confused with Feast Entities). Documents can be organized in a hierarchy using Kinds.
6565

66-
We use the following structure to store feature data in the Firestore:
67-
* at the first level, there is a collection for each Feast project
68-
* second level, in each project-collection, there is a Firebase document for each Feature Table
69-
* third level, in the document for the Feature Table, there is a subcollection called `values` that contain a document per feature row. That document contains the following fields:
70-
* `key` contains entity key as serialized `feast.types.EntityKey` proto
71-
* `values` contains feature name to value map, values serialized as `feast.types.Value` proto
72-
* `event_ts` contains event timestamp (in the native firestore timestamp format)
73-
* `created_ts` contains write timestamp (in the native firestore timestamp format)
66+
We use the following structure to store feature data in Datastore:
67+
* There is a Datastore Entity for each Feast Project, with Kind `Project`.
68+
* Under that Datastore Entity, there is a Datastore Entity for each Feast Feature Table or View, with Kind `Table`. That contains one additional field, `created_ts` that contains the timestamp when this Datastore Entity was created.
69+
* Under that Datastore Entity, there is a Datastore Entity for each Feast Entity Key with Kind `Row`. That contains the following fields:
70+
* `key` contains entity key as serialized `feast.types.EntityKey` proto
71+
* `values` contains feature name to value map, values serialized as `feast.types.Value` proto
72+
* `event_ts` contains event timestamp (in the datastore timestamp format)
73+
* `created_ts` contains write timestamp (in the datastore timestamp format).
7474

75-
Document id for the feature document is computed by hashing entity key using murmurhash3_128 algorithm as follows:
75+
The id for the `Row` Datastore Entity is computed by hashing entity key using murmurhash3_128 algorithm as follows:
7676

77-
1. hash entity names, sorted in alphanumeric order, by serializing them to bytes using the Value Serialization steps below
78-
2. hash the entity values in the same order as corresponding entity names, by serializing them to bytes using the Value Serialization steps below
77+
1. Hash entity names, sorted in alphanumeric order, by serializing them to bytes using the Value Serialization steps below.
78+
2. Hash the entity values in the same order as corresponding entity names, by serializing them to bytes using the Value Serialization steps below.
7979

8080
Value Serialization:
8181
* Store the type of the value (ValueType enum) as little-endian uint32.
82-
* Store the byte length of the serialized value as little-endian uint32
82+
* Store the byte length of the serialized value as little-endian uint32.
8383
* Store the serialized value as bytes:
8484
- binary values are serialized as is
8585
- string values serialized as utf8 string
8686
- int64 and int32 hashed as little-endian byte representation (8 and 4 bytes respectively)
87-
- bool hashed as 0 or 1 byte
87+
- bool hashed as 0 or 1 byte.
8888

8989
Other types of entity keys are not supported in this version of the specification, when using Cloud Firestore.
9090

9191
**Example:**
9292

93-
![Firestore Online Example](firebase_online_example.png)
93+
![Datastore Online Example](datastore_online_example.png)
9494

9595
# Appendix
9696

infra/docker/ci/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ ENV MAVEN_HOME /usr/share/maven
2929
ENV MAVEN_CONFIG "/root/.m2"
3030

3131
# Install Make and Python
32-
ENV PYTHON_VERSION 3.6
32+
ENV PYTHON_VERSION 3.7
3333

3434
RUN apt-get install -y build-essential curl python${PYTHON_VERSION} \
3535
python${PYTHON_VERSION}-dev python${PYTHON_VERSION}-distutils && \
@@ -77,7 +77,7 @@ RUN PROTOC_ZIP=protoc-${PROTOC_VERSION}-linux-x86_64.zip && \
7777
RUN curl -sL https://aka.ms/InstallAzureCLIDeb | bash
7878

7979
# Install kubectl
80-
RUN apt-get install -y kubectl=1.20.2-00
80+
RUN apt-get install -y kubectl=1.20.4-00
8181

8282
# Install helm
8383
RUN curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3 && \

sdk/python/feast/feature_store.py

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,47 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
from pathlib import Path
1415
from typing import Optional
1516

16-
from feast.repo_config import RepoConfig, load_repo_config
17+
from feast.infra.provider import Provider, get_provider
18+
from feast.registry import Registry
19+
from feast.repo_config import (
20+
LocalOnlineStoreConfig,
21+
OnlineStoreConfig,
22+
RepoConfig,
23+
load_repo_config,
24+
)
1725

1826

1927
class FeatureStore:
2028
"""
2129
A FeatureStore object is used to define, create, and retrieve features.
2230
"""
2331

32+
config: RepoConfig
33+
2434
def __init__(
25-
self, config_path: Optional[str], config: Optional[RepoConfig],
35+
self, repo_path: Optional[str], config: Optional[RepoConfig],
2636
):
27-
if config_path is None or config is None:
28-
raise Exception("You cannot specify both config_path and config")
37+
if repo_path is not None and config is not None:
38+
raise Exception("You cannot specify both repo_path and config")
2939
if config is not None:
3040
self.config = config
31-
elif config_path is not None:
32-
self.config = load_repo_config(config_path)
41+
elif repo_path is not None:
42+
self.config = load_repo_config(Path(repo_path))
3343
else:
34-
self.config = RepoConfig()
44+
self.config = RepoConfig(
45+
metadata_store="./metadata.db",
46+
project="default",
47+
provider="local",
48+
online_store=OnlineStoreConfig(
49+
local=LocalOnlineStoreConfig("online_store.db")
50+
),
51+
)
52+
53+
def _get_provider(self) -> Provider:
54+
return get_provider(self.config)
55+
56+
def _get_registry(self) -> Registry:
57+
return Registry(self.config.metadata_store)

sdk/python/feast/infra/gcp.py

Lines changed: 97 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
from datetime import datetime
2-
from typing import List, Optional
2+
from typing import Dict, List, Optional, Tuple
3+
4+
import mmh3
5+
from pytz import utc
36

47
from feast import FeatureTable
58
from feast.infra.provider import Provider
69
from feast.repo_config import DatastoreOnlineStoreConfig
10+
from feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
11+
from feast.types.Value_pb2 import Value as ValueProto
12+
13+
from .key_encoding_utils import serialize_entity_key
714

815

916
def _delete_all_values(client, key) -> None:
1017
"""
1118
Delete all data under the key path in datastore.
1219
"""
1320
while True:
14-
query = client.query(kind="Value", ancestor=key)
21+
query = client.query(kind="Row", ancestor=key)
1522
entities = list(query.fetch(limit=1000))
1623
if not entities:
1724
return
@@ -21,19 +28,37 @@ def _delete_all_values(client, key) -> None:
2128
client.delete(entity.key)
2229

2330

31+
def compute_datastore_entity_id(entity_key: EntityKeyProto) -> str:
32+
"""
33+
Compute Datastore Entity id given Feast Entity Key.
34+
35+
Remember that Datastore Entity is a concept from the Datastore data model, that has nothing to
36+
do with the Entity concept we have in Feast.
37+
"""
38+
return mmh3.hash_bytes(serialize_entity_key(entity_key)).hex()
39+
40+
41+
def _make_tzaware(t: datetime):
42+
""" We assume tz-naive datetimes are UTC """
43+
if t.tzinfo is None:
44+
return t.replace(tzinfo=utc)
45+
else:
46+
return t
47+
48+
2449
class Gcp(Provider):
25-
_project_id: Optional[str]
50+
_gcp_project_id: Optional[str]
2651

2752
def __init__(self, config: Optional[DatastoreOnlineStoreConfig]):
2853
if config:
29-
self._project_id = config.project_id
54+
self._gcp_project_id = config.project_id
3055
else:
31-
self._project_id = None
56+
self._gcp_project_id = None
3257

3358
def _initialize_client(self):
3459
from google.cloud import datastore
3560

36-
if self._project_id is not None:
61+
if self._gcp_project_id is not None:
3762
return datastore.Client(self.project_id)
3863
else:
3964
return datastore.Client()
@@ -49,28 +74,88 @@ def update_infra(
4974
client = self._initialize_client()
5075

5176
for table in tables_to_keep:
52-
key = client.key("FeastProject", project, "FeatureTable", table.name)
77+
key = client.key("Project", project, "Table", table.name)
5378
entity = datastore.Entity(key=key)
54-
entity.update({"created_at": datetime.utcnow()})
79+
entity.update({"created_ts": datetime.utcnow()})
5580
client.put(entity)
5681

5782
for table in tables_to_delete:
5883
_delete_all_values(
59-
client, client.key("FeastProject", project, "FeatureTable", table.name)
84+
client, client.key("Project", project, "Table", table.name)
6085
)
6186

6287
# Delete the table metadata datastore entity
63-
key = client.key("FeastProject", project, "FeatureTable", table.name)
88+
key = client.key("Project", project, "Table", table.name)
6489
client.delete(key)
6590

6691
def teardown_infra(self, project: str, tables: List[FeatureTable]) -> None:
6792
client = self._initialize_client()
6893

6994
for table in tables:
7095
_delete_all_values(
71-
client, client.key("FeastProject", project, "FeatureTable", table.name)
96+
client, client.key("Project", project, "Table", table.name)
7297
)
7398

7499
# Delete the table metadata datastore entity
75-
key = client.key("FeastProject", project, "FeatureTable", table.name)
100+
key = client.key("Project", project, "Table", table.name)
76101
client.delete(key)
102+
103+
def online_write_batch(
104+
self,
105+
project: str,
106+
table: FeatureTable,
107+
data: List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime]],
108+
created_ts: datetime,
109+
) -> None:
110+
from google.cloud import datastore
111+
112+
client = self._initialize_client()
113+
114+
for entity_key, features, timestamp in data:
115+
document_id = compute_datastore_entity_id(entity_key)
116+
117+
key = client.key(
118+
"Project", project, "Table", table.name, "Row", document_id,
119+
)
120+
with client.transaction():
121+
entity = client.get(key)
122+
if entity is not None:
123+
if entity["event_ts"] > _make_tzaware(timestamp):
124+
# Do not overwrite feature values computed from fresher data
125+
continue
126+
elif entity["event_ts"] == _make_tzaware(timestamp) and entity[
127+
"created_ts"
128+
] > _make_tzaware(created_ts):
129+
# Do not overwrite feature values computed from the same data, but
130+
# computed later than this one
131+
continue
132+
else:
133+
entity = datastore.Entity(key=key)
134+
135+
entity.update(
136+
dict(
137+
key=entity_key.SerializeToString(),
138+
values={k: v.SerializeToString() for k, v in features.items()},
139+
event_ts=_make_tzaware(timestamp),
140+
created_ts=_make_tzaware(created_ts),
141+
)
142+
)
143+
client.put(entity)
144+
145+
def online_read(
146+
self, project: str, table: FeatureTable, entity_key: EntityKeyProto
147+
) -> Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]:
148+
client = self._initialize_client()
149+
150+
document_id = compute_datastore_entity_id(entity_key)
151+
key = client.key("Project", project, "Table", table.name, "Row", document_id)
152+
value = client.get(key)
153+
if value is not None:
154+
res = {}
155+
for feature_name, value_bin in value["values"].items():
156+
val = ValueProto()
157+
val.ParseFromString(value_bin)
158+
res[feature_name] = val
159+
return value["event_ts"], res
160+
else:
161+
return None, None
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import struct
2+
from typing import List, Tuple
3+
4+
from feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
5+
from feast.types.Value_pb2 import Value as ValueProto
6+
from feast.types.Value_pb2 import ValueType
7+
8+
9+
def _serialize_val(value_type, v: ValueProto) -> Tuple[bytes, int]:
10+
if value_type == "string_val":
11+
return v.string_val.encode("utf8"), ValueType.STRING
12+
elif value_type == "bytes_val":
13+
return v.bytes_val, ValueType.BYTES
14+
elif value_type == "int32_val":
15+
return struct.pack("<i", v.int32_val), ValueType.INT32
16+
elif value_type == "int64_val":
17+
return struct.pack("<l", v.int64_val), ValueType.INT64
18+
else:
19+
raise ValueError(f"Value type not supported for Firestore: {v}")
20+
21+
22+
def serialize_entity_key(entity_key: EntityKeyProto) -> bytes:
23+
"""
24+
Serialize entity key to a bytestring so it can be used as a lookup key in a hash table.
25+
26+
We need this encoding to be stable; therefore we cannot just use protobuf serialization
27+
here since it does not guarantee that two proto messages containing the same data will
28+
serialize to the same byte string[1].
29+
30+
[1] https://developers.google.com/protocol-buffers/docs/encoding
31+
"""
32+
sorted_keys, sorted_values = zip(
33+
*sorted(zip(entity_key.entity_names, entity_key.entity_values))
34+
)
35+
36+
output: List[bytes] = []
37+
for k in sorted_keys:
38+
output.append(struct.pack("<I", ValueType.STRING))
39+
output.append(k.encode("utf8"))
40+
for v in sorted_values:
41+
val_bytes, value_type = _serialize_val(v.WhichOneof("val"), v)
42+
43+
output.append(struct.pack("<I", value_type))
44+
45+
output.append(struct.pack("<I", len(val_bytes)))
46+
output.append(val_bytes)
47+
48+
return b"".join(output)

0 commit comments

Comments
 (0)