Skip to content

Commit 8572675

Browse files
committed
Refactored code and modified tests to remove timestamps
1 parent 75b1aa6 commit 8572675

File tree

16 files changed

+145
-227
lines changed

16 files changed

+145
-227
lines changed

protos/feast/storage/Redis.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616

1717
syntax = "proto3";
18-
import "google/protobuf/timestamp.proto";
1918

2019
import "feast/types/Field.proto";
2120

serving/src/main/java/feast/serving/config/ServingApiConfiguration.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
import com.google.common.util.concurrent.ListeningExecutorService;
2121
import com.google.common.util.concurrent.MoreExecutors;
22-
import com.google.gson.Gson;
23-
import com.google.gson.reflect.TypeToken;
2422
import feast.core.StoreProto.Store;
2523
import feast.serving.service.CachedSpecStorage;
2624
import feast.serving.service.CoreService;
@@ -29,10 +27,7 @@
2927
import feast.serving.service.SpecStorage;
3028
import io.opentracing.Tracer;
3129
import io.opentracing.contrib.concurrent.TracedExecutorService;
32-
import java.lang.reflect.Type;
33-
import java.util.Collections;
3430
import java.util.List;
35-
import java.util.Map;
3631
import java.util.concurrent.ExecutorService;
3732
import java.util.concurrent.Executors;
3833
import java.util.concurrent.ScheduledExecutorService;
@@ -117,7 +112,7 @@ public FeastServing getFeastServing(
117112
store.getRedisConfig().getPort());
118113
return new RedisFeastServing(jedisPool, tracer);
119114
case BIGQUERY:
120-
// TODO: Implement connection to Bigquery
115+
// TODO: Implement connection to BigQuery
121116
return null;
122117
default:
123118
return null;

serving/src/main/java/feast/serving/model/FeatureValue.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

serving/src/main/java/feast/serving/service/CachedSpecStorage.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ public class CachedSpecStorage implements SpecStorage {
3535

3636
private static final int MAX_SPEC_COUNT = 1000;
3737

38-
private final CoreService coreService;
38+
private final SpecStorage coreService;
3939
private final String storeId;
4040

4141
private final CacheLoader<String, FeatureSetSpec> featureSetSpecCacheLoader;
4242
private final LoadingCache<String, FeatureSetSpec> featureSetSpecCache;
4343
private Store store;
4444

45-
public CachedSpecStorage(CoreService coreService, String storeId) {
45+
public CachedSpecStorage(SpecStorage coreService, String storeId) {
4646
this.storeId = storeId;
4747
this.coreService = coreService;
4848
this.store = coreService.getStoreDetails(storeId);
@@ -54,16 +54,25 @@ public CachedSpecStorage(CoreService coreService, String storeId) {
5454
CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(featureSetSpecCacheLoader);
5555
}
5656

57+
/**
58+
* {@inheritDoc}
59+
*/
5760
@Override
5861
public Store getStoreDetails(String id) {
5962
return store;
6063
}
6164

65+
/**
66+
* {@inheritDoc}
67+
*/
6268
@Override
6369
public Map<String, FeatureSetSpec> getFeatureSetSpecs(List<Subscription> subscriptions) {
6470
return featureSetSpecCache.asMap();
6571
}
6672

73+
/**
74+
* {@inheritDoc}
75+
*/
6776
@Override
6877
public boolean isConnected() {
6978
return coreService.isConnected();

serving/src/main/java/feast/serving/service/CoreService.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,11 @@ public Store getStoreDetails(String id) {
5959
GetStoresRequest request = GetStoresRequest.newBuilder()
6060
.setFilter(Filter.newBuilder().setName(id)).build();
6161
GetStoresResponse response = blockingStub.getStores(request);
62-
63-
for (Store store : response.getStoreList()) {
64-
if (store.getName().equals(id)) {
65-
return store;
66-
}
67-
}
68-
69-
throw new SpecRetrievalException(String.format("Unable to find store with name: %s", id));
62+
return response.getStoreList()
63+
.stream()
64+
.filter(s -> s.getName().equals(id)).findFirst()
65+
.orElseThrow(() -> new SpecRetrievalException(
66+
String.format("Unable to find store with name: %s", id)));
7067
}
7168

7269
@Override

serving/src/main/java/feast/serving/service/RedisFeastServing.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,10 @@
2929
import feast.serving.ServingAPIProto.GetFeastServingTypeResponse;
3030
import feast.serving.ServingAPIProto.GetFeastServingVersionResponse;
3131
import feast.serving.ServingAPIProto.GetFeaturesRequest;
32-
import feast.serving.ServingAPIProto.GetFeaturesRequest.EntityDataSet;
3332
import feast.serving.ServingAPIProto.GetFeaturesRequest.EntityDataSetRow;
3433
import feast.serving.ServingAPIProto.GetFeaturesRequest.FeatureSet;
3534
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse;
3635
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FeatureDataSet;
37-
import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FeatureDataSet.Builder;
3836
import feast.serving.exception.FeatureRetrievalException;
3937
import feast.storage.RedisProto.RedisKey;
4038
import feast.types.FeatureProto.Field;
@@ -44,7 +42,6 @@
4442
import io.opentracing.Tracer;
4543
import java.util.ArrayList;
4644
import java.util.List;
47-
import java.util.Set;
4845
import java.util.stream.Collectors;
4946
import lombok.extern.slf4j.Slf4j;
5047
import redis.clients.jedis.Jedis;
@@ -79,7 +76,6 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetFeaturesRequest request) {
7976
List<String> entityNames = request.getEntityDataSet().getFieldNamesList();
8077
List<EntityDataSetRow> entityDataSetRows = request.getEntityDataSet()
8178
.getEntityDataSetRowsList();
82-
8379
GetOnlineFeaturesResponse.Builder getOnlineFeatureResponseBuilder = GetOnlineFeaturesResponse
8480
.newBuilder();
8581

@@ -92,38 +88,28 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetFeaturesRequest request) {
9288
redisKeys.add(makeRedisKey(featureSetId, entityNames, entityDataSetRow));
9389
}
9490

95-
// Fetch values
96-
List<byte[]> jedisResps = sendMultiGet(redisKeys);
97-
91+
// Convert ProtocolStringList to list of Strings
9892
List<String> requestedColumns = featureSet.getFeatureNamesList()
9993
.asByteStringList().stream()
10094
.map(ByteString::toStringUtf8)
10195
.collect(Collectors.toList());
10296
requestedColumns.addAll(entityNames);
10397
requestedColumns.add("datetime");
10498

105-
Builder featureDataSetBuilder = FeatureDataSet.newBuilder();
99+
List<FeatureRow> featureRows = new ArrayList<>();
106100
try {
107-
for (byte[] jedisResp : jedisResps) {
108-
FeatureRow.Builder featureRowBuilder = FeatureRow.parseFrom(jedisResp).toBuilder();
109-
for (int i = 0; i < featureRowBuilder.getFieldsCount(); i++) {
110-
if (!requestedColumns.contains(featureRowBuilder.getFields(i).getName())) {
111-
featureRowBuilder.removeFields(i);
112-
}
113-
}
114-
featureDataSetBuilder.addFeatureRows(featureRowBuilder.build());
115-
}
101+
featureRows = sendAndProcessMultiGet(redisKeys, requestedColumns, featureSet);
116102
} catch (NullPointerException e) {
117103
log.error("No keys matching {} found in store", redisKeys);
118104
} catch (InvalidProtocolBufferException e) {
119105
log.error("Unable to parse protobuf", e);
120106
throw new FeatureRetrievalException("Unable to parse protobuf while retrieving feature",
121107
e);
108+
} finally {
109+
FeatureDataSet featureDataSet = FeatureDataSet.newBuilder().setName(featureSet.getName())
110+
.setVersion(featureSet.getVersion()).addAllFeatureRows(featureRows).build();
111+
getOnlineFeatureResponseBuilder.addFeatureDataSets(featureDataSet);
122112
}
123-
featureDataSetBuilder.setName(featureSet.getName());
124-
featureDataSetBuilder.setVersion(featureSet.getVersion());
125-
126-
getOnlineFeatureResponseBuilder.addFeatureDataSets(featureDataSetBuilder.build());
127113
}
128114

129115
return getOnlineFeatureResponseBuilder.build();
@@ -155,6 +141,23 @@ public SetUploadCompleteResponse setBatchFeaturesJobUploadComplete(
155141
return null;
156142
}
157143

144+
private List<FeatureRow> sendAndProcessMultiGet(List<RedisKey> redisKeys,
145+
List<String> requestedColumns, FeatureSet featureSet) throws InvalidProtocolBufferException {
146+
List<byte[]> jedisResps = sendMultiGet(redisKeys);
147+
148+
List<FeatureRow> featureRows = new ArrayList<>();
149+
for (byte[] jedisResp : jedisResps) {
150+
FeatureRow featureRow = FeatureRow.parseFrom(jedisResp);
151+
List<Field> fields = featureRow.getFieldsList().stream()
152+
.filter(f -> requestedColumns.contains(f.getName())).collect(Collectors.toList());
153+
featureRows.add(FeatureRow.newBuilder().addAllFields(fields)
154+
.setEventTimestamp(featureRow.getEventTimestamp())
155+
.setFeatureSet(String.format("%s:%s", featureSet.getName(), featureSet.getVersion()))
156+
.build());
157+
}
158+
return featureRows;
159+
}
160+
158161
/**
159162
* Send a list of get request as an mget
160163
*

serving/src/main/java/feast/serving/service/SpecStorage.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,27 @@
2525

2626
public interface SpecStorage {
2727

28+
/**
29+
* Get backing store information
30+
*
31+
* @param id storeId, name of the store
32+
* @return {@link Store}
33+
*/
2834
Store getStoreDetails(String id);
2935

36+
/**
37+
* Get a map of {@link FeatureSetSpec} of a list of subscription, where the featureSetId
38+
* (e.g. feature_set_name:1) is the key
39+
*
40+
* @return Map of featureSetId and FeatureSetSpec as a key:value pair
41+
*/
3042
Map<String, FeatureSetSpec> getFeatureSetSpecs(List<Subscription> subscriptions);
3143

44+
/**
45+
* Check whether connection to core service is ready
46+
*
47+
* @return return true if it is ready. Otherwise, return false
48+
*/
3249
boolean isConnected();
3350

3451
}

serving/src/main/java/feast/serving/util/EntityMapBuilder.java

Lines changed: 0 additions & 69 deletions
This file was deleted.

serving/src/main/java/feast/serving/util/RequestHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public static void validateRequest(GetFeaturesRequest request) {
3131
throw new IllegalArgumentException("Entity value must be provided");
3232
}
3333

34-
// Value list size in EntityDataSetRow shall be the same as the size of fieldNames + 1
34+
// Value list size in EntityDataSetRow shall be the same as the size of fieldNames
3535
// First entity value will always be timestamp in EntityDataSetRow
3636
int fieldNameCount = request.getEntityDataSet().getFieldNamesCount();
3737
for (EntityDataSetRow edsr : request.getEntityDataSet().getEntityDataSetRowsList()) {

serving/src/main/java/feast/serving/util/StatsUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package feast.serving.util;
1818

19-
import com.google.common.base.Strings;
20-
//import feast.serving.ServingAPIProto.QueryFeaturesRequest;
2119
import feast.serving.ServingAPIProto.GetFeaturesRequest;
2220
import feast.serving.ServingAPIProto.GetFeaturesRequest.FeatureSet;
2321
import io.grpc.Context;
@@ -26,6 +24,8 @@
2624
import java.util.ArrayList;
2725
import java.util.List;
2826

27+
//import feast.serving.ServingAPIProto.QueryFeaturesRequest;
28+
2929
/**
3030
* Utility class for statistics.
3131
*/

0 commit comments

Comments
 (0)