Skip to content

Commit 8c2201c

Browse files
authored
Allow users compute statistics over retrieved batch datasets (feast-dev#799)
* Refactor bigquery stats, add functionality to compute statistics over retrieved batch datasets * Rename dataset * Add documentation * Fix end to end tests * Apply spotless * Avoid comparing histograms
1 parent a1207b5 commit 8c2201c

25 files changed

Lines changed: 630 additions & 400 deletions

File tree

core/src/main/java/feast/core/service/StatsService.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import feast.proto.core.CoreServiceProto.GetFeatureStatisticsResponse;
3232
import feast.proto.core.StoreProto;
3333
import feast.proto.core.StoreProto.Store.StoreType;
34-
import feast.storage.api.statistics.FeatureSetStatistics;
34+
import feast.storage.api.statistics.FeatureStatistics;
3535
import feast.storage.api.statistics.StatisticsRetriever;
3636
import feast.storage.connectors.bigquery.statistics.BigQueryStatisticsRetriever;
3737
import java.io.IOException;
@@ -200,7 +200,7 @@ private List<FeatureNameStatistics> getFeatureNameStatisticsByDataset(
200200
// Else, add to the list of features we still need to retrieve statistics for.
201201
for (String featureName : features) {
202202
Feature feature = featureNameToFeature.get(featureName);
203-
Optional<FeatureStatistics> cachedFeatureStatistics = Optional.empty();
203+
Optional<feast.core.model.FeatureStatistics> cachedFeatureStatistics = Optional.empty();
204204
if (!forceRefresh) {
205205
cachedFeatureStatistics =
206206
featureStatisticsRepository.findFeatureStatisticsByFeatureAndDatasetId(
@@ -216,7 +216,7 @@ private List<FeatureNameStatistics> getFeatureNameStatisticsByDataset(
216216
// Retrieve the balance of statistics after checking the cache, and add it to the
217217
// list of FeatureNameStatistics.
218218
if (featuresMissingStats.size() > 0) {
219-
FeatureSetStatistics featureSetStatistics =
219+
FeatureStatistics featureSetStatistics =
220220
statisticsRetriever.getFeatureStatistics(
221221
featureSet.toProto().getSpec(), featuresMissingStats, datasetId);
222222

@@ -226,9 +226,9 @@ private List<FeatureNameStatistics> getFeatureNameStatisticsByDataset(
226226
continue;
227227
}
228228
Feature feature = featureNameToFeature.get(stat.getName());
229-
FeatureStatistics featureStatistics =
230-
FeatureStatistics.createForDataset(feature, stat, datasetId);
231-
Optional<FeatureStatistics> existingRecord =
229+
feast.core.model.FeatureStatistics featureStatistics =
230+
feast.core.model.FeatureStatistics.createForDataset(feature, stat, datasetId);
231+
Optional<feast.core.model.FeatureStatistics> existingRecord =
232232
featureStatisticsRepository.findFeatureStatisticsByFeatureAndDatasetId(
233233
featureStatistics.getFeature(), datasetId);
234234
existingRecord.ifPresent(statistics -> featureStatistics.setId(statistics.getId()));
@@ -270,7 +270,7 @@ private List<FeatureNameStatistics> getFeatureNameStatisticsByDate(
270270
// Else, add to the list of features we still need to retrieve statistics for.
271271
for (String featureName : features) {
272272
Feature feature = featureNameToFeature.get(featureName);
273-
Optional<FeatureStatistics> cachedFeatureStatistics = Optional.empty();
273+
Optional<feast.core.model.FeatureStatistics> cachedFeatureStatistics = Optional.empty();
274274
if (!forceRefresh) {
275275
cachedFeatureStatistics =
276276
featureStatisticsRepository.findFeatureStatisticsByFeatureAndDate(feature, date);
@@ -285,7 +285,7 @@ private List<FeatureNameStatistics> getFeatureNameStatisticsByDate(
285285
// Retrieve the balance of statistics after checking the cache, and add it to the
286286
// list of FeatureNameStatistics.
287287
if (featuresMissingStats.size() > 0) {
288-
FeatureSetStatistics featureSetStatistics =
288+
FeatureStatistics featureSetStatistics =
289289
statisticsRetriever.getFeatureStatistics(
290290
featureSet.toProto().getSpec(),
291291
featuresMissingStats,
@@ -297,8 +297,9 @@ private List<FeatureNameStatistics> getFeatureNameStatisticsByDate(
297297
continue;
298298
}
299299
Feature feature = featureNameToFeature.get(stat.getName());
300-
FeatureStatistics featureStatistics = FeatureStatistics.createForDate(feature, stat, date);
301-
Optional<FeatureStatistics> existingRecord =
300+
feast.core.model.FeatureStatistics featureStatistics =
301+
feast.core.model.FeatureStatistics.createForDate(feature, stat, date);
302+
Optional<feast.core.model.FeatureStatistics> existingRecord =
302303
featureStatisticsRepository.findFeatureStatisticsByFeatureAndDate(
303304
featureStatistics.getFeature(), date);
304305
existingRecord.ifPresent(statistics -> featureStatistics.setId(statistics.getId()));

docs/user-guide/feature-retrieval.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,38 @@ Feast can retrieve features from any amount of feature sets, as long as they occ
107107

108108
Point-in-time-correct joins also prevents the occurrence of feature leakage by trying to accurate the state of the world at a single point in time, instead of just joining features based on the nearest timestamps.
109109

110+
### **Computing statistics over retrieved data**
111+
112+
Feast is able to compute [TFDV](https://tensorflow.google.cn/tfx/tutorials/data_validation/tfdv_basic) compatible statistics over data retrieved from historical stores. The statistics can be used in conjunction with feature schemas and TFDV to verify the integrity of your retrieved dataset, or to [Facets](https://github.com/PAIR-code/facets) to visualize the distribution.
113+
114+
The computation of statistics is not enabled by default. To indicate to Feast that the statistics are to be computed for a given historical retrieval request, pass `compute_statistics=True` to `get_batch_features`.
115+
116+
```python
117+
dataset = client.get_batch_features(
118+
feature_refs=features,
119+
entity_rows=entity_df
120+
compute_statistics=True
121+
)
122+
123+
stats = dataset.statistics()
124+
```
125+
126+
If a schema is already defined over the feature sets on question, tfdv can be used to detect anomalies over the dataset.
127+
128+
```python
129+
# Build combined schema over retrieved dataset
130+
schema = schema_pb2.Schema()
131+
for feature_set in feature_sets:
132+
fs_schema = feature_set.export_tfx_schema()
133+
for feature_schema in fs_schema.feature:
134+
if feature_schema.name in features:
135+
schema.feature.append(feature_schema)
136+
137+
# detect anomalies
138+
anomalies = tfdv.validate_statistics(statistics=stats, schema=schema)
139+
```
140+
141+
110142
## Online feature retrieval
111143

112144
Online feature retrieval works in much the same way as batch retrieval, with one important distinction: Online stores only maintain the current state of features. No historical data is served.

protos/feast/serving/ServingService.proto

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package feast.serving;
2020

2121
import "google/protobuf/timestamp.proto";
2222
import "feast/types/Value.proto";
23+
import "tensorflow_metadata/proto/v0/statistics.proto";
2324

2425
option java_package = "feast.proto.serving";
2526
option java_outer_classname = "ServingAPIProto";
@@ -100,6 +101,18 @@ message GetOnlineFeaturesRequest {
100101
}
101102
}
102103

104+
message GetBatchFeaturesRequest {
105+
// List of features that are being retrieved
106+
repeated FeatureReference features = 3;
107+
108+
// Source of the entity dataset containing the timestamps and entity keys to retrieve
109+
// features for.
110+
DatasetSource dataset_source = 2;
111+
112+
// Compute statistics for the dataset retrieved
113+
bool compute_statistics = 4;
114+
}
115+
103116
message GetOnlineFeaturesResponse {
104117
// Feature values retrieved from feast.
105118
repeated FieldValues field_values = 1;
@@ -134,15 +147,6 @@ message GetOnlineFeaturesResponse {
134147
}
135148
}
136149

137-
message GetBatchFeaturesRequest {
138-
// List of features that are being retrieved
139-
repeated FeatureReference features = 3;
140-
141-
// Source of the entity dataset containing the timestamps and entity keys to retrieve
142-
// features for.
143-
DatasetSource dataset_source = 2;
144-
}
145-
146150
message GetBatchFeaturesResponse {
147151
Job job = 1;
148152
}
@@ -196,6 +200,9 @@ message Job {
196200
// Output only. The data format for all the files.
197201
// For CSV format, the files contain both feature values and a column header.
198202
DataFormat data_format = 6;
203+
// Output only. The statistics computed over
204+
// the retrieved dataset. Only available for BigQuery stores.
205+
tensorflow.metadata.v0.DatasetFeatureStatisticsList dataset_feature_statistics_list = 7;
199206
}
200207

201208
message DatasetSource {

sdk/python/feast/client.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,10 @@
5151
GetFeatureSetRequest,
5252
GetFeatureSetResponse,
5353
GetFeatureStatisticsRequest,
54-
ListFeaturesRequest,
55-
ListFeaturesResponse,
5654
ListFeatureSetsRequest,
5755
ListFeatureSetsResponse,
56+
ListFeaturesRequest,
57+
ListFeaturesResponse,
5858
ListIngestionJobsRequest,
5959
ListProjectsRequest,
6060
ListProjectsResponse,
@@ -561,6 +561,7 @@ def get_batch_features(
561561
self,
562562
feature_refs: List[str],
563563
entity_rows: Union[pd.DataFrame, str],
564+
compute_statistics: bool = False,
564565
project: str = None,
565566
) -> RetrievalJob:
566567
"""
@@ -577,6 +578,8 @@ def get_batch_features(
577578
Each entity in a feature set must be present as a column in this
578579
dataframe. The datetime column must contain timestamps in
579580
datetime64 format.
581+
compute_statistics (bool):
582+
Indicates whether Feast should compute statistics over the retrieved dataset.
580583
project: Specifies the project which contain the FeatureSets
581584
which the requested features belong to.
582585
@@ -656,6 +659,7 @@ def get_batch_features(
656659
file_uris=staged_files, data_format=DataFormat.DATA_FORMAT_AVRO
657660
)
658661
),
662+
compute_statistics=compute_statistics,
659663
)
660664

661665
# Retrieve Feast Job object to manage life cycle of retrieval

sdk/python/feast/job.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from feast.serving.ServingService_pb2_grpc import ServingServiceStub
2525
from feast.source import Source
2626
from feast.wait import wait_retry_backoff
27+
from tensorflow_metadata.proto.v0 import statistics_pb2
2728

2829

2930
class RetrievalJob:
@@ -193,6 +194,26 @@ def to_chunked_dataframe(
193194
def __iter__(self):
194195
return iter(self.result())
195196

197+
def statistics(
198+
self, timeout_sec: int = int(defaults[CONFIG_TIMEOUT_KEY])
199+
) -> statistics_pb2.DatasetFeatureStatisticsList:
200+
"""
201+
Get statistics computed over the retrieved data set. Statistics will only be computed for
202+
columns that are part of Feast, and not the columns that were provided.
203+
204+
Args:
205+
timeout_sec (int):
206+
Max no of seconds to wait until job is done. If "timeout_sec"
207+
is exceeded, an exception will be raised.
208+
209+
Returns:
210+
DatasetFeatureStatisticsList containing statistics of Feast features over the retrieved dataset.
211+
"""
212+
self.get_avro_files(timeout_sec) # wait for job completion
213+
if self.job_proto.error:
214+
raise Exception(self.job_proto.error)
215+
return self.job_proto.dataset_feature_statistics_list
216+
196217

197218
class IngestJob:
198219
"""

sdk/python/tests/test_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
from feast.core.CoreService_pb2 import (
3535
GetFeastCoreVersionResponse,
3636
GetFeatureSetResponse,
37-
ListFeaturesResponse,
3837
ListFeatureSetsResponse,
38+
ListFeaturesResponse,
3939
ListIngestionJobsResponse,
4040
)
4141
from feast.core.FeatureSet_pb2 import EntitySpec as EntitySpecProto

serving/src/main/java/feast/serving/service/HistoricalServingService.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,10 @@ public GetBatchFeaturesResponse getBatchFeatures(GetBatchFeaturesRequest getFeat
8181
public void run() {
8282
HistoricalRetrievalResult result =
8383
retriever.getHistoricalFeatures(
84-
retrievalId, getFeaturesRequest.getDatasetSource(), featureSetRequests);
84+
retrievalId,
85+
getFeaturesRequest.getDatasetSource(),
86+
featureSetRequests,
87+
getFeaturesRequest.getComputeStatistics());
8588
jobService.upsert(resultToJob(result));
8689
}
8790
});
@@ -111,9 +114,11 @@ private Job resultToJob(HistoricalRetrievalResult result) {
111114
if (result.hasError()) {
112115
return builder.setError(result.getError()).build();
113116
}
114-
return builder
115-
.addAllFileUris(result.getFileUris())
116-
.setDataFormat(result.getDataFormat())
117-
.build();
117+
Builder jobBuilder =
118+
builder.addAllFileUris(result.getFileUris()).setDataFormat(result.getDataFormat());
119+
if (result.getStats() != null) {
120+
jobBuilder.setDatasetFeatureStatisticsList(result.getStats());
121+
}
122+
return builder.build();
118123
}
119124
}

storage/api/src/main/java/feast/storage/api/retriever/HistoricalRetrievalResult.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.Serializable;
2323
import java.util.List;
2424
import javax.annotation.Nullable;
25+
import org.tensorflow.metadata.v0.DatasetFeatureStatisticsList;
2526

2627
/** Result of a historical feature retrieval request. */
2728
@AutoValue
@@ -40,6 +41,9 @@ public abstract class HistoricalRetrievalResult implements Serializable {
4041
@Nullable
4142
public abstract DataFormat getDataFormat();
4243

44+
@Nullable
45+
public abstract DatasetFeatureStatisticsList getStats();
46+
4347
/**
4448
* Instantiates a {@link HistoricalRetrievalResult} indicating that the retrieval was a failure,
4549
* together with its associated error.
@@ -75,10 +79,29 @@ public static HistoricalRetrievalResult success(
7579
.build();
7680
}
7781

82+
/**
83+
* Adds statistics to the result
84+
*
85+
* @param stats {@link DatasetFeatureStatisticsList} for the retrieved dataset
86+
* @return {@link HistoricalRetrievalResult}
87+
*/
88+
public HistoricalRetrievalResult withStats(DatasetFeatureStatisticsList stats) {
89+
return toBuilder().setStats(stats).build();
90+
}
91+
7892
static Builder newBuilder() {
7993
return new AutoValue_HistoricalRetrievalResult.Builder();
8094
}
8195

96+
Builder toBuilder() {
97+
return newBuilder()
98+
.setId(getId())
99+
.setStatus(getStatus())
100+
.setFileUris(getFileUris())
101+
.setError(getError())
102+
.setDataFormat(getDataFormat());
103+
}
104+
82105
@AutoValue.Builder
83106
abstract static class Builder {
84107
abstract Builder setId(String id);
@@ -91,6 +114,8 @@ abstract static class Builder {
91114

92115
abstract Builder setDataFormat(DataFormat dataFormat);
93116

117+
abstract Builder setStats(DatasetFeatureStatisticsList stats);
118+
94119
abstract HistoricalRetrievalResult build();
95120
}
96121

storage/api/src/main/java/feast/storage/api/retriever/HistoricalRetriever.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,13 @@ public interface HistoricalRetriever {
4141
* entity columns.
4242
* @param featureSetRequests List of {@link FeatureSetRequest} to feature references in the
4343
* request tied to that feature set.
44+
* @param computeStatistics whether to compute statistics over the resultant dataset.
4445
* @return {@link HistoricalRetrievalResult} if successful, contains the location of the results,
4546
* else contains the error to be returned to the user.
4647
*/
4748
HistoricalRetrievalResult getHistoricalFeatures(
48-
String retrievalId, DatasetSource datasetSource, List<FeatureSetRequest> featureSetRequests);
49+
String retrievalId,
50+
DatasetSource datasetSource,
51+
List<FeatureSetRequest> featureSetRequests,
52+
boolean computeStatistics);
4953
}

storage/api/src/main/java/feast/storage/api/statistics/FeatureSetStatistics.java renamed to storage/api/src/main/java/feast/storage/api/statistics/FeatureStatistics.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@
2020
import com.google.common.collect.ImmutableList;
2121
import org.tensorflow.metadata.v0.FeatureNameStatistics;
2222

23-
/** Feature statistics for a feature set over a bounded set of data. */
23+
/** Feature statistics over a bounded set of data. */
2424
@AutoValue
25-
public abstract class FeatureSetStatistics {
25+
public abstract class FeatureStatistics {
2626

2727
public abstract long getNumExamples();
2828

2929
public abstract ImmutableList<FeatureNameStatistics> getFeatureNameStatistics();
3030

3131
public static Builder newBuilder() {
32-
return new AutoValue_FeatureSetStatistics.Builder();
32+
return new AutoValue_FeatureStatistics.Builder();
3333
}
3434

3535
@AutoValue.Builder
@@ -43,6 +43,6 @@ public Builder addFeatureNameStatistics(FeatureNameStatistics featureNameStatist
4343
return this;
4444
}
4545

46-
public abstract FeatureSetStatistics build();
46+
public abstract FeatureStatistics build();
4747
}
4848
}

0 commit comments

Comments
 (0)