Skip to content

Commit 6f1174a

Browse files
authored
Persisting results of historical retrieval (#2197)
* persisting results of historical retrieval Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * fix after rebase Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
1 parent d7707c1 commit 6f1174a

File tree

25 files changed

+1262
-262
lines changed

25 files changed

+1262
-262
lines changed

docs/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* [Feature service](getting-started/concepts/feature-service.md)
1717
* [Feature retrieval](getting-started/concepts/feature-retrieval.md)
1818
* [Point-in-time joins](getting-started/concepts/point-in-time-joins.md)
19+
* [Dataset](getting-started/concepts/dataset.md)
1920
* [Architecture](getting-started/architecture-and-components/README.md)
2021
* [Overview](getting-started/architecture-and-components/overview.md)
2122
* [Feature repository](getting-started/architecture-and-components/feature-repository.md)

docs/getting-started/concepts/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414

1515
{% page-ref page="point-in-time-joins.md" %}
1616

17+
{% page-ref page="dataset.md" %}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Dataset
2+
3+
Feast datasets allow for conveniently saving dataframes that include both features and entities to be subsequently used for data analysis and model training.
4+
[Data Quality Monitoring](https://docs.google.com/document/d/110F72d4NTv80p35wDSONxhhPBqWRwbZXG4f9mNEMd98) was the primary motivation for creating dataset concept.
5+
6+
Dataset's metadata is stored in the Feast registry and raw data (features, entities, additional input keys and timestamp) is stored in the [offline store](../architecture-and-components/offline-store.md).
7+
8+
Dataset can be created from:
9+
1. Results of historical retrieval
10+
2. [planned] Logging request (including input for [on demand transformation](../../reference/alpha-on-demand-feature-view.md)) and response during feature serving
11+
3. [planned] Logging features during writing to online store (from batch source or stream)
12+
13+
14+
### Creating Saved Dataset from Historical Retrieval
15+
16+
To create a saved dataset from historical features for later retrieval or analysis, a user needs to call `get_historical_features` method first and then pass the returned retrieval job to `create_saved_dataset` method.
17+
`create_saved_dataset` will trigger provided retrieval job (by calling `.persist()` on it) to store the data using specified `storage`.
18+
Storage type must be the same as globally configured offline store (eg, it's impossible to persist data to Redshift with BigQuery source).
19+
`create_saved_dataset` will also create SavedDataset object with all related metadata and will write it to the registry.
20+
21+
```python
22+
from feast import FeatureStore
23+
from feast.infra.offline_stores.bigquery_source import SavedDatasetBigQueryStorage
24+
25+
store = FeatureStore()
26+
27+
historical_job = store.get_historical_features(
28+
features=["driver:avg_trip"],
29+
entity_df=...,
30+
)
31+
32+
dataset = store.create_saved_dataset(
33+
from_=historical_job,
34+
name='my_training_dataset',
35+
storage=SavedDatasetBigQueryStorage(table_ref='<gcp-project>.<gcp-dataset>.my_training_dataset'),
36+
tags={'author': 'oleksii'}
37+
)
38+
39+
dataset.to_df()
40+
```
41+
42+
Saved dataset can be later retrieved using `get_saved_dataset` method:
43+
```python
44+
dataset = store.get_saved_dataset('my_training_dataset')
45+
dataset.to_df()
46+
```

protos/feast/core/Registry.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import "feast/core/FeatureView.proto";
2828
import "feast/core/InfraObject.proto";
2929
import "feast/core/OnDemandFeatureView.proto";
3030
import "feast/core/RequestFeatureView.proto";
31+
import "feast/core/SavedDataset.proto";
3132
import "google/protobuf/timestamp.proto";
3233

3334
message Registry {
@@ -37,6 +38,7 @@ message Registry {
3738
repeated OnDemandFeatureView on_demand_feature_views = 8;
3839
repeated RequestFeatureView request_feature_views = 9;
3940
repeated FeatureService feature_services = 7;
41+
repeated SavedDataset saved_datasets = 11;
4042
Infra infra = 10;
4143

4244
string registry_schema_version = 3; // to support migrations; incremented when schema is changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//
2+
// Copyright 2021 The Feast Authors
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// https://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
18+
syntax = "proto3";
19+
20+
package feast.core;
21+
option java_package = "feast.proto.core";
22+
option java_outer_classname = "SavedDatasetProto";
23+
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
24+
25+
import "google/protobuf/timestamp.proto";
26+
import "feast/core/FeatureViewProjection.proto";
27+
import "feast/core/DataSource.proto";
28+
29+
message SavedDatasetSpec {
30+
// Name of the dataset. Must be unique since it's possible to overwrite dataset by name
31+
string name = 1;
32+
33+
// Name of Feast project that this Dataset belongs to.
34+
string project = 2;
35+
36+
// list of feature references with format "<view name>:<feature name>"
37+
repeated string features = 3;
38+
39+
// entity columns + request columns from all feature views used during retrieval
40+
repeated string join_keys = 4;
41+
42+
// Whether full feature names are used in stored data
43+
bool full_feature_names = 5;
44+
45+
SavedDatasetStorage storage = 6;
46+
47+
// User defined metadata
48+
map<string, string> tags = 7;
49+
}
50+
51+
message SavedDatasetStorage {
52+
oneof kind {
53+
DataSource.FileOptions file_storage = 4;
54+
DataSource.BigQueryOptions bigquery_storage = 5;
55+
DataSource.RedshiftOptions redshift_storage = 6;
56+
}
57+
}
58+
59+
message SavedDatasetMeta {
60+
// Time when this saved dataset is created
61+
google.protobuf.Timestamp created_timestamp = 1;
62+
63+
// Time when this saved dataset is last updated
64+
google.protobuf.Timestamp last_updated_timestamp = 2;
65+
66+
// Min timestamp in the dataset (needed for retrieval)
67+
google.protobuf.Timestamp min_event_timestamp = 3;
68+
69+
// Max timestamp in the dataset (needed for retrieval)
70+
google.protobuf.Timestamp max_event_timestamp = 4;
71+
}
72+
73+
message SavedDataset {
74+
SavedDatasetSpec spec = 1;
75+
SavedDatasetMeta meta = 2;
76+
}

sdk/python/feast/errors.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ def __init__(self, bucket):
7474
super().__init__(f"S3 bucket {bucket} for the Feast registry can't be accessed")
7575

7676

77+
class SavedDatasetNotFound(FeastObjectNotFoundException):
78+
def __init__(self, name: str, project: str):
79+
super().__init__(f"Saved dataset {name} does not exist in project {project}")
80+
81+
7782
class FeastProviderLoginError(Exception):
7883
"""Error class that indicates a user has not authenticated with their provider."""
7984

sdk/python/feast/feature_store.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
from feast.repo_config import RepoConfig, load_repo_config
7878
from feast.repo_contents import RepoContents
7979
from feast.request_feature_view import RequestFeatureView
80+
from feast.saved_dataset import SavedDataset, SavedDatasetStorage
8081
from feast.type_map import python_values_to_proto_values
8182
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
8283
from feast.value_type import ValueType
@@ -764,6 +765,93 @@ def get_historical_features(
764765

765766
return job
766767

768+
@log_exceptions_and_usage
769+
def create_saved_dataset(
770+
self,
771+
from_: RetrievalJob,
772+
name: str,
773+
storage: SavedDatasetStorage,
774+
tags: Optional[Dict[str, str]] = None,
775+
) -> SavedDataset:
776+
"""
777+
Execute provided retrieval job and persist its outcome in given storage.
778+
Storage type (eg, BigQuery or Redshift) must be the same as globally configured offline store.
779+
After data successfully persisted saved dataset object with dataset metadata is committed to the registry.
780+
Name for the saved dataset should be unique within project, since it's possible to overwrite previously stored dataset
781+
with the same name.
782+
783+
Returns:
784+
SavedDataset object with attached RetrievalJob
785+
786+
Raises:
787+
ValueError if given retrieval job doesn't have metadata
788+
"""
789+
warnings.warn(
790+
"Saving dataset is an experimental feature. "
791+
"This API is unstable and it could and most probably will be changed in the future. "
792+
"We do not guarantee that future changes will maintain backward compatibility.",
793+
RuntimeWarning,
794+
)
795+
796+
if not from_.metadata:
797+
raise ValueError(
798+
"RetrievalJob must contains metadata. "
799+
"Use RetrievalJob produced by get_historical_features"
800+
)
801+
802+
dataset = SavedDataset(
803+
name=name,
804+
features=from_.metadata.features,
805+
join_keys=from_.metadata.keys,
806+
full_feature_names=from_.full_feature_names,
807+
storage=storage,
808+
tags=tags,
809+
)
810+
811+
dataset.min_event_timestamp = from_.metadata.min_event_timestamp
812+
dataset.max_event_timestamp = from_.metadata.max_event_timestamp
813+
814+
from_.persist(storage)
815+
816+
self._registry.apply_saved_dataset(dataset, self.project, commit=True)
817+
818+
return dataset.with_retrieval_job(
819+
self._get_provider().retrieve_saved_dataset(
820+
config=self.config, dataset=dataset
821+
)
822+
)
823+
824+
@log_exceptions_and_usage
825+
def get_saved_dataset(self, name: str) -> SavedDataset:
826+
"""
827+
Find a saved dataset in the registry by provided name and
828+
create a retrieval job to pull whole dataset from storage (offline store).
829+
830+
If dataset couldn't be found by provided name SavedDatasetNotFound exception will be raised.
831+
832+
Data will be retrieved from globally configured offline store.
833+
834+
Returns:
835+
SavedDataset with RetrievalJob attached
836+
837+
Raises:
838+
SavedDatasetNotFound
839+
"""
840+
warnings.warn(
841+
"Retrieving datasets is an experimental feature. "
842+
"This API is unstable and it could and most probably will be changed in the future. "
843+
"We do not guarantee that future changes will maintain backward compatibility.",
844+
RuntimeWarning,
845+
)
846+
847+
dataset = self._registry.get_saved_dataset(name, self.project)
848+
provider = self._get_provider()
849+
850+
retrieval_job = provider.retrieve_saved_dataset(
851+
config=self.config, dataset=dataset
852+
)
853+
return dataset.with_retrieval_job(retrieval_job)
854+
767855
@log_exceptions_and_usage
768856
def materialize_incremental(
769857
self, end_date: datetime, feature_views: Optional[List[str]] = None,

0 commit comments

Comments
 (0)