Skip to content

Commit 1b2e3a6

Browse files
zhilingczhilingc
authored andcommitted
Add method to retrieve statistics over historical data
1 parent ef47f3c commit 1b2e3a6

File tree

23 files changed

+2864
-31
lines changed

23 files changed

+2864
-31
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2019 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.core.dao;
18+
19+
import feast.core.model.Feature;
20+
import feast.core.model.FeatureStatistics;
21+
import java.util.Date;
22+
import java.util.Optional;
23+
import org.springframework.data.jpa.repository.JpaRepository;
24+
25+
/** JPA repository supplying Statistics objects for features keyed by id. */
26+
public interface FeatureStatisticsRepository extends JpaRepository<FeatureStatistics, Integer> {
27+
Optional<FeatureStatistics> findFeatureStatisticsByFeatureAndDatasetId(
28+
Feature feature, String datasetId);
29+
30+
Optional<FeatureStatistics> findFeatureStatisticsByFeatureAndDate(Feature feature, Date date);
31+
}

core/src/main/java/feast/core/grpc/CoreServiceImpl.java

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,9 @@
2525
import feast.core.service.AccessManagementService;
2626
import feast.core.service.JobService;
2727
import feast.core.service.SpecService;
28+
import feast.core.service.StatsService;
2829
import feast.proto.core.CoreServiceGrpc.CoreServiceImplBase;
29-
import feast.proto.core.CoreServiceProto.ApplyFeatureSetRequest;
30-
import feast.proto.core.CoreServiceProto.ApplyFeatureSetResponse;
31-
import feast.proto.core.CoreServiceProto.ArchiveProjectRequest;
32-
import feast.proto.core.CoreServiceProto.ArchiveProjectResponse;
33-
import feast.proto.core.CoreServiceProto.CreateProjectRequest;
34-
import feast.proto.core.CoreServiceProto.CreateProjectResponse;
35-
import feast.proto.core.CoreServiceProto.GetFeastCoreVersionRequest;
36-
import feast.proto.core.CoreServiceProto.GetFeastCoreVersionResponse;
37-
import feast.proto.core.CoreServiceProto.GetFeatureSetRequest;
38-
import feast.proto.core.CoreServiceProto.GetFeatureSetResponse;
39-
import feast.proto.core.CoreServiceProto.ListFeatureSetsRequest;
40-
import feast.proto.core.CoreServiceProto.ListFeatureSetsResponse;
41-
import feast.proto.core.CoreServiceProto.ListIngestionJobsRequest;
42-
import feast.proto.core.CoreServiceProto.ListIngestionJobsResponse;
43-
import feast.proto.core.CoreServiceProto.ListProjectsRequest;
44-
import feast.proto.core.CoreServiceProto.ListProjectsResponse;
45-
import feast.proto.core.CoreServiceProto.ListStoresRequest;
46-
import feast.proto.core.CoreServiceProto.ListStoresResponse;
47-
import feast.proto.core.CoreServiceProto.RestartIngestionJobRequest;
48-
import feast.proto.core.CoreServiceProto.RestartIngestionJobResponse;
49-
import feast.proto.core.CoreServiceProto.StopIngestionJobRequest;
50-
import feast.proto.core.CoreServiceProto.StopIngestionJobResponse;
51-
import feast.proto.core.CoreServiceProto.UpdateStoreRequest;
52-
import feast.proto.core.CoreServiceProto.UpdateStoreResponse;
30+
import feast.proto.core.CoreServiceProto.*;
5331
import io.grpc.Status;
5432
import io.grpc.StatusRuntimeException;
5533
import io.grpc.stub.StreamObserver;
@@ -69,11 +47,13 @@ public class CoreServiceImpl extends CoreServiceImplBase {
6947
private SpecService specService;
7048
private AccessManagementService accessManagementService;
7149
private JobService jobService;
50+
private StatsService statsService;
7251

7352
@Autowired
7453
public CoreServiceImpl(
7554
SpecService specService,
7655
AccessManagementService accessManagementService,
56+
StatsService statsService,
7757
JobService jobService,
7858
FeastProperties feastProperties) {
7959
this.specService = specService;
@@ -126,6 +106,32 @@ public void listFeatureSets(
126106
}
127107
}
128108

109+
@Override
110+
public void getFeatureStatistics(
111+
GetFeatureStatisticsRequest request,
112+
StreamObserver<GetFeatureStatisticsResponse> responseObserver) {
113+
try {
114+
GetFeatureStatisticsResponse response = statsService.getFeatureStatistics(request);
115+
responseObserver.onNext(response);
116+
responseObserver.onCompleted();
117+
} catch (IllegalArgumentException e) {
118+
log.error("Illegal arguments provided to GetFeatureStatistics method: ", e);
119+
responseObserver.onError(
120+
Status.INVALID_ARGUMENT
121+
.withDescription(e.getMessage())
122+
.withCause(e)
123+
.asRuntimeException());
124+
} catch (RetrievalException e) {
125+
log.error("Unable to fetch feature set requested in GetFeatureStatistics method: ", e);
126+
responseObserver.onError(
127+
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asRuntimeException());
128+
} catch (Exception e) {
129+
log.error("Exception has occurred in GetFeatureStatistics method: ", e);
130+
responseObserver.onError(
131+
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
132+
}
133+
}
134+
129135
@Override
130136
public void listStores(
131137
ListStoresRequest request, StreamObserver<ListStoresResponse> responseObserver) {
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.core.model;
18+
19+
import com.google.protobuf.InvalidProtocolBufferException;
20+
import java.io.*;
21+
import java.util.Date;
22+
import java.util.List;
23+
import javax.persistence.*;
24+
import javax.persistence.Entity;
25+
import lombok.Getter;
26+
import lombok.NoArgsConstructor;
27+
import lombok.Setter;
28+
import org.tensorflow.metadata.v0.*;
29+
30+
@NoArgsConstructor
31+
@Getter
32+
@Setter
33+
@Entity
34+
@Table(
35+
name = "feature_statistics",
36+
indexes = {
37+
@Index(name = "idx_feature_statistics_feature", columnList = "feature_id"),
38+
@Index(name = "idx_feature_statistics_dataset_id", columnList = "datasetId"),
39+
@Index(name = "idx_feature_statistics_date", columnList = "date"),
40+
})
41+
public class FeatureStatistics {
42+
@Id
43+
@GeneratedValue(strategy = GenerationType.AUTO)
44+
private int id;
45+
46+
@ManyToOne private Feature feature;
47+
48+
// Only one of these fields should be populated.
49+
private String datasetId;
50+
private Date date;
51+
52+
// General statistics
53+
private String featureType;
54+
private long count;
55+
private long numMissing;
56+
private long minNumValues;
57+
private long maxNumValues;
58+
private float avgNumValues;
59+
private long totalNumValues;
60+
private byte[] numValuesHistogram;
61+
62+
// Numeric statistics
63+
private double mean;
64+
private double stdev;
65+
private long zeroes;
66+
private double min;
67+
private double max;
68+
private double median;
69+
private byte[] numericValueHistogram;
70+
private byte[] numericValueQuantiles;
71+
72+
// String statistics
73+
@Column(name = "n_unique")
74+
private long unique;
75+
76+
private float averageLength;
77+
private byte[] rankHistogram;
78+
private byte[] topValues;
79+
80+
// Byte statistics
81+
private float minBytes;
82+
private float maxBytes;
83+
private float avgBytes;
84+
85+
// Instantiates a Statistics object from a tensorflow metadata FeatureNameStatistics object and a
86+
// dataset ID.
87+
public static FeatureStatistics createForDataset(
88+
Feature feature, FeatureNameStatistics featureNameStatistics, String datasetId)
89+
throws IOException {
90+
FeatureStatistics featureStatistics = FeatureStatistics.fromProto(featureNameStatistics);
91+
featureStatistics.setFeature(feature);
92+
featureStatistics.setDatasetId(datasetId);
93+
return featureStatistics;
94+
}
95+
96+
// Instantiates a Statistics object from a tensorflow metadata FeatureNameStatistics object and a
97+
// date.
98+
public static FeatureStatistics createForDate(
99+
Feature feature, FeatureNameStatistics featureNameStatistics, Date date) throws IOException {
100+
FeatureStatistics featureStatistics = FeatureStatistics.fromProto(featureNameStatistics);
101+
featureStatistics.setDate(date);
102+
featureStatistics.setFeature(feature);
103+
return featureStatistics;
104+
}
105+
106+
public FeatureNameStatistics toProto() throws InvalidProtocolBufferException {
107+
FeatureNameStatistics.Builder featureNameStatisticsBuilder =
108+
FeatureNameStatistics.newBuilder()
109+
.setType(FeatureNameStatistics.Type.valueOf(featureType))
110+
.setPath(Path.newBuilder().addStep(feature.getName()));
111+
CommonStatistics commonStatistics =
112+
CommonStatistics.newBuilder()
113+
.setNumNonMissing(count - numMissing)
114+
.setNumMissing(numMissing)
115+
.setMaxNumValues(maxNumValues)
116+
.setMinNumValues(minNumValues)
117+
.setTotNumValues(totalNumValues)
118+
.setNumValuesHistogram(Histogram.parseFrom(numValuesHistogram))
119+
.build();
120+
121+
switch (featureNameStatisticsBuilder.getType()) {
122+
case INT:
123+
case FLOAT:
124+
NumericStatistics numStats =
125+
NumericStatistics.newBuilder()
126+
.setCommonStats(commonStatistics)
127+
.setMean(mean)
128+
.setStdDev(stdev)
129+
.setNumZeros(zeroes)
130+
.setMin(min)
131+
.setMax(max)
132+
.setMedian(median)
133+
.addHistograms(Histogram.parseFrom(numericValueHistogram))
134+
.addHistograms(Histogram.parseFrom(numericValueQuantiles))
135+
.build();
136+
featureNameStatisticsBuilder.setNumStats(numStats);
137+
break;
138+
case STRING:
139+
StringStatistics.Builder stringStats =
140+
StringStatistics.newBuilder()
141+
.setCommonStats(commonStatistics)
142+
.setUnique(unique)
143+
.setAvgLength(averageLength);
144+
if (rankHistogram == null) {
145+
stringStats.setRankHistogram(RankHistogram.getDefaultInstance());
146+
} else {
147+
stringStats.setRankHistogram(RankHistogram.parseFrom(rankHistogram));
148+
}
149+
try (ByteArrayInputStream bis = new ByteArrayInputStream(topValues)) {
150+
ObjectInputStream ois = new ObjectInputStream(bis);
151+
List<StringStatistics.FreqAndValue> freqAndValueList =
152+
(List<StringStatistics.FreqAndValue>) ois.readObject();
153+
stringStats.addAllTopValues(freqAndValueList);
154+
} catch (IOException | ClassNotFoundException e) {
155+
throw new InvalidProtocolBufferException(
156+
"Failed to parse field: StringStatistics.TopValues. Check if the value is malformed.");
157+
}
158+
featureNameStatisticsBuilder.setStringStats(stringStats);
159+
break;
160+
case BYTES:
161+
BytesStatistics bytesStats =
162+
BytesStatistics.newBuilder()
163+
.setCommonStats(commonStatistics)
164+
.setAvgNumBytes(avgBytes)
165+
.setMinNumBytes(minBytes)
166+
.setMaxNumBytes(maxBytes)
167+
.build();
168+
featureNameStatisticsBuilder.setBytesStats(bytesStats);
169+
break;
170+
case STRUCT:
171+
StructStatistics structStats =
172+
StructStatistics.newBuilder().setCommonStats(commonStatistics).build();
173+
featureNameStatisticsBuilder.setStructStats(structStats);
174+
break;
175+
}
176+
return featureNameStatisticsBuilder.build();
177+
}
178+
179+
private static FeatureStatistics fromProto(FeatureNameStatistics featureNameStatistics)
180+
throws IOException, IllegalArgumentException {
181+
FeatureStatistics featureStatistics = new FeatureStatistics();
182+
featureStatistics.setFeatureType(featureNameStatistics.getType().toString());
183+
CommonStatistics commonStats;
184+
switch (featureNameStatistics.getType()) {
185+
case FLOAT:
186+
case INT:
187+
NumericStatistics numStats = featureNameStatistics.getNumStats();
188+
commonStats = numStats.getCommonStats();
189+
featureStatistics.setMean(numStats.getMean());
190+
featureStatistics.setStdev(numStats.getStdDev());
191+
featureStatistics.setZeroes(numStats.getNumZeros());
192+
featureStatistics.setMin(numStats.getMin());
193+
featureStatistics.setMax(numStats.getMax());
194+
featureStatistics.setMedian(numStats.getMedian());
195+
for (Histogram histogram : numStats.getHistogramsList()) {
196+
switch (histogram.getType()) {
197+
case STANDARD:
198+
featureStatistics.setNumericValueHistogram(histogram.toByteArray());
199+
case QUANTILES:
200+
featureStatistics.setNumericValueQuantiles(histogram.toByteArray());
201+
default:
202+
// invalid type, dropping the values
203+
}
204+
}
205+
break;
206+
case STRING:
207+
StringStatistics stringStats = featureNameStatistics.getStringStats();
208+
commonStats = stringStats.getCommonStats();
209+
featureStatistics.setUnique(stringStats.getUnique());
210+
featureStatistics.setAverageLength(stringStats.getAvgLength());
211+
featureStatistics.setRankHistogram(stringStats.getRankHistogram().toByteArray());
212+
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
213+
ObjectOutputStream oos = new ObjectOutputStream(bos);
214+
oos.writeObject(stringStats.getTopValuesList());
215+
featureStatistics.setTopValues(bos.toByteArray());
216+
}
217+
break;
218+
case BYTES:
219+
BytesStatistics bytesStats = featureNameStatistics.getBytesStats();
220+
commonStats = bytesStats.getCommonStats();
221+
featureStatistics.setUnique(bytesStats.getUnique());
222+
featureStatistics.setMaxBytes(bytesStats.getMaxNumBytes());
223+
featureStatistics.setMinBytes(bytesStats.getMinNumBytes());
224+
featureStatistics.setAvgBytes(bytesStats.getAvgNumBytes());
225+
break;
226+
case STRUCT:
227+
StructStatistics structStats = featureNameStatistics.getStructStats();
228+
commonStats = structStats.getCommonStats();
229+
break;
230+
default:
231+
throw new IllegalArgumentException("Feature statistics provided were of unknown type.");
232+
}
233+
featureStatistics.setCount(commonStats.getNumMissing() + commonStats.getNumNonMissing());
234+
featureStatistics.setNumMissing(commonStats.getNumMissing());
235+
featureStatistics.setMinNumValues(commonStats.getMinNumValues());
236+
featureStatistics.setMaxNumValues(commonStats.getMaxNumValues());
237+
featureStatistics.setAvgNumValues(commonStats.getAvgNumValues());
238+
featureStatistics.setTotalNumValues(commonStats.getTotNumValues());
239+
featureStatistics.setNumValuesHistogram(commonStats.getNumValuesHistogram().toByteArray());
240+
241+
return featureStatistics;
242+
}
243+
}

0 commit comments

Comments
 (0)