Skip to content

Commit 033659e

Browse files
authored
[Java Feature Server] Use hgetall in redis connector when number of retrieved fields is big enough (feast-dev#2159)
* hgetall Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * clean up Redis Hash Decoder Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * expected size for collections Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * more cleanup Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * format Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * do not use streams in critical parts Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * enable autoflush in redis cluster client Signed-off-by: pyalex <moskalenko.alexey@gmail.com> * hgetall threshold as constant Signed-off-by: pyalex <moskalenko.alexey@gmail.com>
1 parent 7d4369f commit 033659e

File tree

9 files changed

+158
-126
lines changed

9 files changed

+158
-126
lines changed

java/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,8 @@
565565
<groupId>org.apache.maven.plugins</groupId>
566566
<artifactId>maven-compiler-plugin</artifactId>
567567
<configuration>
568+
<source>11</source>
569+
<target>11</target>
568570
<release>11</release>
569571
<annotationProcessorPaths>
570572
<path>

java/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -166,13 +166,13 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re
166166
storageRetrievalSpan.setTag("entities", entityRows.size());
167167
storageRetrievalSpan.setTag("features", featureReferences.size());
168168
}
169-
List<List<Feature>> entityRowsFeatures =
169+
List<Map<FeatureReferenceV2, Feature>> features =
170170
retriever.getOnlineFeatures(projectName, entityRows, featureReferences, entityNames);
171171
if (storageRetrievalSpan != null) {
172172
storageRetrievalSpan.finish();
173173
}
174174

175-
if (entityRowsFeatures.size() != entityRows.size()) {
175+
if (features.size() != entityRows.size()) {
176176
throw Status.INTERNAL
177177
.withDescription(
178178
"The no. of FeatureRow obtained from OnlineRetriever"
@@ -184,36 +184,30 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re
184184

185185
for (int i = 0; i < entityRows.size(); i++) {
186186
GetOnlineFeaturesRequestV2.EntityRow entityRow = entityRows.get(i);
187-
List<Feature> curEntityRowFeatures = entityRowsFeatures.get(i);
188-
189-
Map<FeatureReferenceV2, Feature> featureReferenceFeatureMap =
190-
getFeatureRefFeatureMap(curEntityRowFeatures);
187+
Map<FeatureReferenceV2, Feature> featureRow = features.get(i);
191188

192189
Map<String, ValueProto.Value> rowValues = values.get(i);
193190
Map<String, GetOnlineFeaturesResponse.FieldStatus> rowStatuses = statuses.get(i);
194191

195192
for (FeatureReferenceV2 featureReference : featureReferences) {
196-
if (featureReferenceFeatureMap.containsKey(featureReference)) {
197-
Feature feature = featureReferenceFeatureMap.get(featureReference);
193+
if (featureRow.containsKey(featureReference)) {
194+
Feature feature = featureRow.get(featureReference);
198195

199-
ValueProto.Value value =
200-
feature.getFeatureValue(featureValueTypes.get(feature.getFeatureReference()));
196+
ValueProto.Value value = feature.getFeatureValue(featureValueTypes.get(featureReference));
201197

202198
Boolean isOutsideMaxAge =
203-
checkOutsideMaxAge(
204-
feature, entityRow, featureMaxAges.get(feature.getFeatureReference()));
199+
checkOutsideMaxAge(feature, entityRow, featureMaxAges.get(featureReference));
205200

206201
if (value != null) {
207-
rowValues.put(FeatureV2.getFeatureStringRef(feature.getFeatureReference()), value);
202+
rowValues.put(FeatureV2.getFeatureStringRef(featureReference), value);
208203
} else {
209204
rowValues.put(
210-
FeatureV2.getFeatureStringRef(feature.getFeatureReference()),
205+
FeatureV2.getFeatureStringRef(featureReference),
211206
ValueProto.Value.newBuilder().build());
212207
}
213208

214209
rowStatuses.put(
215-
FeatureV2.getFeatureStringRef(feature.getFeatureReference()),
216-
getMetadata(value, isOutsideMaxAge));
210+
FeatureV2.getFeatureStringRef(featureReference), getMetadata(value, isOutsideMaxAge));
217211
} else {
218212
rowValues.put(
219213
FeatureV2.getFeatureStringRef(featureReference),
@@ -314,11 +308,6 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re
314308
return GetOnlineFeaturesResponse.newBuilder().addAllFieldValues(fieldValuesList).build();
315309
}
316310

317-
private static Map<FeatureReferenceV2, Feature> getFeatureRefFeatureMap(List<Feature> features) {
318-
return features.stream()
319-
.collect(Collectors.toMap(Feature::getFeatureReference, Function.identity()));
320-
}
321-
322311
/**
323312
* Generate Field level Status metadata for the given valueMap.
324313
*

java/serving/src/test/java/feast/serving/service/OnlineServingServiceTest.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static org.mockito.Mockito.when;
2424
import static org.mockito.MockitoAnnotations.initMocks;
2525

26+
import com.google.common.collect.ImmutableMap;
2627
import com.google.protobuf.Duration;
2728
import com.google.protobuf.Timestamp;
2829
import feast.proto.core.FeatureProto;
@@ -42,6 +43,7 @@
4243
import io.opentracing.Tracer.SpanBuilder;
4344
import java.util.ArrayList;
4445
import java.util.List;
46+
import java.util.Map;
4547
import org.junit.Before;
4648
import org.junit.Test;
4749
import org.mockito.ArgumentMatchers;
@@ -151,14 +153,14 @@ public void shouldReturnResponseWithValuesAndMetadataIfKeysPresent() {
151153
List.of(featureReference1, featureReference2);
152154
GetOnlineFeaturesRequestV2 request = getOnlineFeaturesRequestV2(projectName, featureReferences);
153155

154-
List<Feature> entityKeyList1 = new ArrayList<>();
155-
List<Feature> entityKeyList2 = new ArrayList<>();
156-
entityKeyList1.add(mockedFeatureRows.get(0));
157-
entityKeyList1.add(mockedFeatureRows.get(1));
158-
entityKeyList2.add(mockedFeatureRows.get(2));
159-
entityKeyList2.add(mockedFeatureRows.get(3));
160-
161-
List<List<Feature>> featureRows = List.of(entityKeyList1, entityKeyList2);
156+
List<Map<ServingAPIProto.FeatureReferenceV2, Feature>> featureRows =
157+
List.of(
158+
ImmutableMap.of(
159+
mockedFeatureRows.get(0).getFeatureReference(), mockedFeatureRows.get(0),
160+
mockedFeatureRows.get(1).getFeatureReference(), mockedFeatureRows.get(1)),
161+
ImmutableMap.of(
162+
mockedFeatureRows.get(2).getFeatureReference(), mockedFeatureRows.get(2),
163+
mockedFeatureRows.get(3).getFeatureReference(), mockedFeatureRows.get(3)));
162164

163165
when(retrieverV2.getOnlineFeatures(any(), any(), any(), any())).thenReturn(featureRows);
164166
when(registry.getFeatureViewSpec(any(), any())).thenReturn(getFeatureViewSpec());
@@ -225,7 +227,13 @@ public void shouldReturnResponseWithUnsetValuesAndMetadataIfKeysNotPresent() {
225227
entityKeyList1.add(mockedFeatureRows.get(1));
226228
entityKeyList2.add(mockedFeatureRows.get(4));
227229

228-
List<List<Feature>> featureRows = List.of(entityKeyList1, entityKeyList2);
230+
List<Map<ServingAPIProto.FeatureReferenceV2, Feature>> featureRows =
231+
List.of(
232+
ImmutableMap.of(
233+
mockedFeatureRows.get(0).getFeatureReference(), mockedFeatureRows.get(0),
234+
mockedFeatureRows.get(1).getFeatureReference(), mockedFeatureRows.get(1)),
235+
ImmutableMap.of(
236+
mockedFeatureRows.get(4).getFeatureReference(), mockedFeatureRows.get(4)));
229237

230238
when(retrieverV2.getOnlineFeatures(any(), any(), any(), any())).thenReturn(featureRows);
231239
when(registry.getFeatureViewSpec(any(), any())).thenReturn(getFeatureViewSpec());
@@ -282,14 +290,14 @@ public void shouldReturnResponseWithValuesAndMetadataIfMaxAgeIsExceeded() {
282290
List.of(featureReference1, featureReference2);
283291
GetOnlineFeaturesRequestV2 request = getOnlineFeaturesRequestV2(projectName, featureReferences);
284292

285-
List<Feature> entityKeyList1 = new ArrayList<>();
286-
List<Feature> entityKeyList2 = new ArrayList<>();
287-
entityKeyList1.add(mockedFeatureRows.get(5));
288-
entityKeyList1.add(mockedFeatureRows.get(1));
289-
entityKeyList2.add(mockedFeatureRows.get(5));
290-
entityKeyList2.add(mockedFeatureRows.get(1));
291-
292-
List<List<Feature>> featureRows = List.of(entityKeyList1, entityKeyList2);
293+
List<Map<ServingAPIProto.FeatureReferenceV2, Feature>> featureRows =
294+
List.of(
295+
ImmutableMap.of(
296+
mockedFeatureRows.get(5).getFeatureReference(), mockedFeatureRows.get(5),
297+
mockedFeatureRows.get(1).getFeatureReference(), mockedFeatureRows.get(1)),
298+
ImmutableMap.of(
299+
mockedFeatureRows.get(5).getFeatureReference(), mockedFeatureRows.get(5),
300+
mockedFeatureRows.get(1).getFeatureReference(), mockedFeatureRows.get(1)));
293301

294302
when(retrieverV2.getOnlineFeatures(any(), any(), any(), any())).thenReturn(featureRows);
295303
when(registry.getFeatureViewSpec(any(), any()))

java/storage/api/src/main/java/feast/storage/api/retriever/OnlineRetrieverV2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import feast.proto.serving.ServingAPIProto;
2020
import java.util.List;
21+
import java.util.Map;
2122

2223
public interface OnlineRetrieverV2 {
2324
/**
@@ -37,7 +38,7 @@ public interface OnlineRetrieverV2 {
3738
* @return list of {@link Feature}s corresponding to data retrieved for each entity row from
3839
* FeatureTable specified in FeatureTable request.
3940
*/
40-
List<List<Feature>> getOnlineFeatures(
41+
List<Map<ServingAPIProto.FeatureReferenceV2, Feature>> getOnlineFeatures(
4142
String project,
4243
List<ServingAPIProto.GetOnlineFeaturesRequestV2.EntityRow> entityRows,
4344
List<ServingAPIProto.FeatureReferenceV2> featureReferences,

java/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/common/RedisHashDecoder.java

Lines changed: 45 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@
1616
*/
1717
package feast.storage.connectors.redis.common;
1818

19+
import com.google.common.collect.Maps;
1920
import com.google.common.hash.Hashing;
2021
import com.google.protobuf.InvalidProtocolBufferException;
2122
import com.google.protobuf.Timestamp;
2223
import feast.proto.serving.ServingAPIProto;
2324
import feast.proto.types.ValueProto;
2425
import feast.storage.api.retriever.Feature;
2526
import feast.storage.api.retriever.ProtoFeature;
26-
import io.lettuce.core.KeyValue;
27+
import java.nio.ByteBuffer;
2728
import java.nio.charset.StandardCharsets;
2829
import java.util.*;
30+
import java.util.stream.Collectors;
2931

3032
public class RedisHashDecoder {
3133

@@ -35,54 +37,57 @@ public class RedisHashDecoder {
3537
* @param redisHashValues retrieved Redis Hash values based on EntityRows
3638
* @param byteToFeatureReferenceMap map to decode bytes back to FeatureReference
3739
* @param timestampPrefix timestamp prefix
38-
* @return List of {@link Feature}
39-
* @throws InvalidProtocolBufferException if a protocol buffer exception occurs
40+
* @return Map of {@link ServingAPIProto.FeatureReferenceV2} to {@link Feature}
4041
*/
41-
public static List<Feature> retrieveFeature(
42-
List<KeyValue<byte[], byte[]>> redisHashValues,
43-
Map<byte[], ServingAPIProto.FeatureReferenceV2> byteToFeatureReferenceMap,
44-
String timestampPrefix)
45-
throws InvalidProtocolBufferException {
46-
List<Feature> allFeatures = new ArrayList<>();
47-
HashMap<ServingAPIProto.FeatureReferenceV2, ValueProto.Value> featureMap = new HashMap<>();
48-
Map<String, Timestamp> featureTableTimestampMap = new HashMap<>();
42+
public static Map<ServingAPIProto.FeatureReferenceV2, Feature> retrieveFeature(
43+
Map<byte[], byte[]> redisHashValues,
44+
Map<ByteBuffer, ServingAPIProto.FeatureReferenceV2> byteToFeatureReferenceMap,
45+
String timestampPrefix) {
46+
Map<String, Timestamp> featureTableTimestampMap =
47+
redisHashValues.entrySet().stream()
48+
.filter(e -> new String(e.getKey()).startsWith(timestampPrefix))
49+
.collect(
50+
Collectors.toMap(
51+
e -> new String(e.getKey()).substring(timestampPrefix.length() + 1),
52+
e -> {
53+
try {
54+
return Timestamp.parseFrom(e.getValue());
55+
} catch (InvalidProtocolBufferException ex) {
56+
throw new RuntimeException(
57+
"Couldn't parse timestamp proto while pulling data from Redis");
58+
}
59+
}));
60+
Map<ServingAPIProto.FeatureReferenceV2, Feature> results =
61+
Maps.newHashMapWithExpectedSize(byteToFeatureReferenceMap.size());
4962

50-
for (KeyValue<byte[], byte[]> entity : redisHashValues) {
51-
if (entity.hasValue()) {
52-
byte[] redisValueK = entity.getKey();
53-
byte[] redisValueV = entity.getValue();
63+
for (Map.Entry<byte[], byte[]> entry : redisHashValues.entrySet()) {
64+
ServingAPIProto.FeatureReferenceV2 featureReference =
65+
byteToFeatureReferenceMap.get(ByteBuffer.wrap(entry.getKey()));
5466

55-
// Decode data from Redis into Feature object fields
56-
if (new String(redisValueK).startsWith(timestampPrefix)) {
57-
Timestamp eventTimestamp = Timestamp.parseFrom(redisValueV);
58-
featureTableTimestampMap.put(new String(redisValueK), eventTimestamp);
59-
} else {
60-
ServingAPIProto.FeatureReferenceV2 featureReference =
61-
byteToFeatureReferenceMap.get(redisValueK);
62-
ValueProto.Value featureValue = ValueProto.Value.parseFrom(redisValueV);
63-
64-
featureMap.put(featureReference, featureValue);
65-
}
67+
if (featureReference == null) {
68+
continue;
6669
}
67-
}
6870

69-
// Add timestamp to features
70-
for (Map.Entry<ServingAPIProto.FeatureReferenceV2, ValueProto.Value> entry :
71-
featureMap.entrySet()) {
72-
String timestampRedisHashKeyStr = timestampPrefix + ":" + entry.getKey().getFeatureTable();
73-
Timestamp curFeatureTimestamp = featureTableTimestampMap.get(timestampRedisHashKeyStr);
74-
75-
ProtoFeature curFeature =
76-
new ProtoFeature(entry.getKey(), curFeatureTimestamp, entry.getValue());
77-
allFeatures.add(curFeature);
71+
ValueProto.Value v;
72+
try {
73+
v = ValueProto.Value.parseFrom(entry.getValue());
74+
} catch (InvalidProtocolBufferException ex) {
75+
throw new RuntimeException(
76+
"Couldn't parse feature value proto while pulling data from Redis");
77+
}
78+
results.put(
79+
featureReference,
80+
new ProtoFeature(
81+
featureReference,
82+
featureTableTimestampMap.get(featureReference.getFeatureTable()),
83+
v));
7884
}
7985

80-
return allFeatures;
86+
return results;
8187
}
8288

83-
public static byte[] getTimestampRedisHashKeyBytes(
84-
ServingAPIProto.FeatureReferenceV2 featureReference, String timestampPrefix) {
85-
String timestampRedisHashKeyStr = timestampPrefix + ":" + featureReference.getFeatureTable();
89+
public static byte[] getTimestampRedisHashKeyBytes(String featureTable, String timestampPrefix) {
90+
String timestampRedisHashKeyStr = timestampPrefix + ":" + featureTable;
8691
return timestampRedisHashKeyStr.getBytes();
8792
}
8893

0 commit comments

Comments
 (0)