Skip to content

Commit a7eb4dc

Browse files
Chen Zhilingdavidheryanto
andauthored
Make redis key creation more determinisitic (#380) (#471)
* Make redis key creation more determinisitic (#380) * Add documentation to RedisKey in Redis.proto Ensure entities are sorted by the name Co-authored-by: David Heryanto <david.heryanto@hotmail.com>
1 parent 1771532 commit a7eb4dc

File tree

3 files changed

+197
-5
lines changed

3 files changed

+197
-5
lines changed

ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
import feast.store.serving.redis.RedisCustomIO.RedisMutation;
2525
import feast.types.FeatureRowProto.FeatureRow;
2626
import feast.types.FieldProto.Field;
27+
import java.util.HashMap;
28+
import java.util.List;
2729
import java.util.Map;
28-
import java.util.Set;
2930
import java.util.stream.Collectors;
3031
import org.apache.beam.sdk.transforms.DoFn;
3132
import org.slf4j.Logger;
@@ -42,17 +43,24 @@ public FeatureRowToRedisMutationDoFn(Map<String, FeatureSet> featureSets) {
4243

4344
private RedisKey getKey(FeatureRow featureRow) {
4445
FeatureSet featureSet = featureSets.get(featureRow.getFeatureSet());
45-
Set<String> entityNames =
46+
List<String> entityNames =
4647
featureSet.getSpec().getEntitiesList().stream()
4748
.map(EntitySpec::getName)
48-
.collect(Collectors.toSet());
49+
.sorted()
50+
.collect(Collectors.toList());
4951

52+
Map<String, Field> entityFields = new HashMap<>();
5053
Builder redisKeyBuilder = RedisKey.newBuilder().setFeatureSet(featureRow.getFeatureSet());
5154
for (Field field : featureRow.getFieldsList()) {
5255
if (entityNames.contains(field.getName())) {
53-
redisKeyBuilder.addEntities(field);
56+
entityFields.putIfAbsent(
57+
field.getName(),
58+
Field.newBuilder().setName(field.getName()).setValue(field.getValue()).build());
5459
}
5560
}
61+
for (String entityName : entityNames) {
62+
redisKeyBuilder.addEntities(entityFields.get(entityName));
63+
}
5664
return redisKeyBuilder.build();
5765
}
5866

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2020 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.store.serving.redis;
18+
19+
import static org.junit.Assert.*;
20+
21+
import com.google.protobuf.Timestamp;
22+
import feast.core.FeatureSetProto;
23+
import feast.core.FeatureSetProto.EntitySpec;
24+
import feast.core.FeatureSetProto.FeatureSetSpec;
25+
import feast.core.FeatureSetProto.FeatureSpec;
26+
import feast.storage.RedisProto.RedisKey;
27+
import feast.store.serving.redis.RedisCustomIO.RedisMutation;
28+
import feast.types.FeatureRowProto.FeatureRow;
29+
import feast.types.FieldProto.Field;
30+
import feast.types.ValueProto.Value;
31+
import feast.types.ValueProto.ValueType.Enum;
32+
import java.util.Arrays;
33+
import java.util.Collections;
34+
import java.util.HashMap;
35+
import java.util.Map;
36+
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
37+
import org.apache.beam.sdk.testing.PAssert;
38+
import org.apache.beam.sdk.testing.TestPipeline;
39+
import org.apache.beam.sdk.transforms.Create;
40+
import org.apache.beam.sdk.transforms.ParDo;
41+
import org.apache.beam.sdk.transforms.SerializableFunction;
42+
import org.apache.beam.sdk.values.PCollection;
43+
import org.junit.Rule;
44+
import org.junit.Test;
45+
46+
public class FeatureRowToRedisMutationDoFnTest {
47+
48+
@Rule public transient TestPipeline p = TestPipeline.create();
49+
50+
private FeatureSetProto.FeatureSet fs =
51+
FeatureSetProto.FeatureSet.newBuilder()
52+
.setSpec(
53+
FeatureSetSpec.newBuilder()
54+
.setName("feature_set")
55+
.setVersion(1)
56+
.addEntities(
57+
EntitySpec.newBuilder()
58+
.setName("entity_id_primary")
59+
.setValueType(Enum.INT32)
60+
.build())
61+
.addEntities(
62+
EntitySpec.newBuilder()
63+
.setName("entity_id_secondary")
64+
.setValueType(Enum.STRING)
65+
.build())
66+
.addFeatures(
67+
FeatureSpec.newBuilder()
68+
.setName("feature_1")
69+
.setValueType(Enum.STRING)
70+
.build())
71+
.addFeatures(
72+
FeatureSpec.newBuilder()
73+
.setName("feature_2")
74+
.setValueType(Enum.INT64)
75+
.build()))
76+
.build();
77+
78+
@Test
79+
public void shouldConvertRowWithDuplicateEntitiesToValidKey() {
80+
Map<String, FeatureSetProto.FeatureSet> featureSets = new HashMap<>();
81+
featureSets.put("feature_set", fs);
82+
83+
FeatureRow offendingRow =
84+
FeatureRow.newBuilder()
85+
.setFeatureSet("feature_set")
86+
.setEventTimestamp(Timestamp.newBuilder().setSeconds(10))
87+
.addFields(
88+
Field.newBuilder()
89+
.setName("entity_id_primary")
90+
.setValue(Value.newBuilder().setInt32Val(1)))
91+
.addFields(
92+
Field.newBuilder()
93+
.setName("entity_id_primary")
94+
.setValue(Value.newBuilder().setInt32Val(2)))
95+
.addFields(
96+
Field.newBuilder()
97+
.setName("entity_id_secondary")
98+
.setValue(Value.newBuilder().setStringVal("a")))
99+
.build();
100+
101+
PCollection<RedisMutation> output =
102+
p.apply(Create.of(Collections.singletonList(offendingRow)))
103+
.setCoder(ProtoCoder.of(FeatureRow.class))
104+
.apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSets)));
105+
106+
RedisKey expectedKey =
107+
RedisKey.newBuilder()
108+
.setFeatureSet("feature_set")
109+
.addEntities(
110+
Field.newBuilder()
111+
.setName("entity_id_primary")
112+
.setValue(Value.newBuilder().setInt32Val(1)))
113+
.addEntities(
114+
Field.newBuilder()
115+
.setName("entity_id_secondary")
116+
.setValue(Value.newBuilder().setStringVal("a")))
117+
.build();
118+
119+
PAssert.that(output)
120+
.satisfies(
121+
(SerializableFunction<Iterable<RedisMutation>, Void>)
122+
input -> {
123+
input.forEach(
124+
rm -> {
125+
assert (Arrays.equals(rm.getKey(), expectedKey.toByteArray()));
126+
assert (Arrays.equals(rm.getValue(), offendingRow.toByteArray()));
127+
});
128+
return null;
129+
});
130+
p.run();
131+
}
132+
133+
@Test
134+
public void shouldConvertRowWithOutOfOrderEntitiesToValidKey() {
135+
Map<String, FeatureSetProto.FeatureSet> featureSets = new HashMap<>();
136+
featureSets.put("feature_set", fs);
137+
138+
FeatureRow offendingRow =
139+
FeatureRow.newBuilder()
140+
.setFeatureSet("feature_set")
141+
.setEventTimestamp(Timestamp.newBuilder().setSeconds(10))
142+
.addFields(
143+
Field.newBuilder()
144+
.setName("entity_id_secondary")
145+
.setValue(Value.newBuilder().setStringVal("a")))
146+
.addFields(
147+
Field.newBuilder()
148+
.setName("entity_id_primary")
149+
.setValue(Value.newBuilder().setInt32Val(1)))
150+
.build();
151+
152+
PCollection<RedisMutation> output =
153+
p.apply(Create.of(Collections.singletonList(offendingRow)))
154+
.setCoder(ProtoCoder.of(FeatureRow.class))
155+
.apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSets)));
156+
157+
RedisKey expectedKey =
158+
RedisKey.newBuilder()
159+
.setFeatureSet("feature_set")
160+
.addEntities(
161+
Field.newBuilder()
162+
.setName("entity_id_primary")
163+
.setValue(Value.newBuilder().setInt32Val(1)))
164+
.addEntities(
165+
Field.newBuilder()
166+
.setName("entity_id_secondary")
167+
.setValue(Value.newBuilder().setStringVal("a")))
168+
.build();
169+
170+
PAssert.that(output)
171+
.satisfies(
172+
(SerializableFunction<Iterable<RedisMutation>, Void>)
173+
input -> {
174+
input.forEach(
175+
rm -> {
176+
assert (Arrays.equals(rm.getKey(), expectedKey.toByteArray()));
177+
assert (Arrays.equals(rm.getValue(), offendingRow.toByteArray()));
178+
});
179+
return null;
180+
});
181+
p.run();
182+
}
183+
}

protos/feast/storage/Redis.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ message RedisKey {
3232
string feature_set = 2;
3333

3434
// List of fields containing entity names and their respective values
35-
// contained within this feature row.
35+
// contained within this feature row. The entities should be sorted
36+
// by the entity name alphabetically in ascending order.
3637
repeated feast.types.Field entities = 3;
3738
}

0 commit comments

Comments
 (0)