Skip to content

Commit f11014f

Browse files
Chen Zhilingfeast-ci-bot
authored andcommitted
Make redis key creation more determinisitic (#380)
* Make redis key creation more determinisitic * Sort entity names
1 parent ec0db7c commit f11014f

2 files changed

Lines changed: 148 additions & 3 deletions

File tree

ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import feast.store.serving.redis.RedisCustomIO.RedisMutation;
2525
import feast.types.FeatureRowProto.FeatureRow;
2626
import feast.types.FieldProto.Field;
27+
import feast.types.ValueProto.Value;
28+
import java.util.HashMap;
29+
import java.util.List;
2730
import java.util.Map;
2831
import java.util.Set;
2932
import java.util.stream.Collectors;
@@ -42,17 +45,27 @@ public FeatureRowToRedisMutationDoFn(Map<String, FeatureSetSpec> featureSetSpecs
4245

4346
private RedisKey getKey(FeatureRow featureRow) {
4447
FeatureSetSpec featureSetSpec = featureSetSpecs.get(featureRow.getFeatureSet());
45-
Set<String> entityNames =
48+
List<String> entityNames =
4649
featureSetSpec.getEntitiesList().stream()
4750
.map(EntitySpec::getName)
48-
.collect(Collectors.toSet());
51+
.sorted()
52+
.collect(Collectors.toList());
4953

54+
Map<String, Field> entityFields = new HashMap<>();
5055
Builder redisKeyBuilder = RedisKey.newBuilder().setFeatureSet(featureRow.getFeatureSet());
5156
for (Field field : featureRow.getFieldsList()) {
5257
if (entityNames.contains(field.getName())) {
53-
redisKeyBuilder.addEntities(field);
58+
entityFields.putIfAbsent(field.getName(),
59+
Field.newBuilder()
60+
.setName(field.getName())
61+
.setValue(field.getValue())
62+
.build()
63+
);
5464
}
5565
}
66+
for (String entityName : entityNames) {
67+
redisKeyBuilder.addEntities(entityFields.get(entityName));
68+
}
5669
return redisKeyBuilder.build();
5770
}
5871

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package feast.store.serving.redis;
2+
3+
import static org.junit.Assert.*;
4+
5+
import com.google.protobuf.Timestamp;
6+
import feast.core.FeatureSetProto.EntitySpec;
7+
import feast.core.FeatureSetProto.FeatureSetSpec;
8+
import feast.core.FeatureSetProto.FeatureSpec;
9+
import feast.ingestion.transform.ValidateFeatureRows;
10+
import feast.storage.RedisProto.RedisKey;
11+
import feast.store.serving.redis.RedisCustomIO.Method;
12+
import feast.store.serving.redis.RedisCustomIO.RedisMutation;
13+
import feast.test.TestUtil;
14+
import feast.types.FeatureRowProto.FeatureRow;
15+
import feast.types.FieldProto.Field;
16+
import feast.types.ValueProto.Value;
17+
import feast.types.ValueProto.ValueType.Enum;
18+
import java.util.Arrays;
19+
import java.util.Collections;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
23+
import org.apache.beam.sdk.testing.PAssert;
24+
import org.apache.beam.sdk.testing.TestPipeline;
25+
import org.apache.beam.sdk.transforms.Create;
26+
import org.apache.beam.sdk.transforms.ParDo;
27+
import org.apache.beam.sdk.transforms.SerializableFunction;
28+
import org.apache.beam.sdk.values.PCollection;
29+
import org.junit.Rule;
30+
import org.junit.Test;
31+
32+
public class FeatureRowToRedisMutationDoFnTest {
33+
34+
@Rule
35+
public transient TestPipeline p = TestPipeline.create();
36+
37+
private FeatureSetSpec fs = FeatureSetSpec.newBuilder()
38+
.setName("feature_set")
39+
.setVersion(1)
40+
.addEntities(
41+
EntitySpec.newBuilder()
42+
.setName("entity_id_primary")
43+
.setValueType(Enum.INT32)
44+
.build())
45+
.addEntities(
46+
EntitySpec.newBuilder()
47+
.setName("entity_id_secondary")
48+
.setValueType(Enum.STRING)
49+
.build())
50+
.addFeatures(
51+
FeatureSpec.newBuilder().setName("feature_1").setValueType(Enum.STRING).build())
52+
.addFeatures(
53+
FeatureSpec.newBuilder().setName("feature_2").setValueType(Enum.INT64).build())
54+
.build();
55+
56+
@Test
57+
public void shouldConvertRowWithDuplicateEntitiesToValidKey() {
58+
Map<String, FeatureSetSpec> featureSetSpecs = new HashMap<>();
59+
featureSetSpecs.put("feature_set", fs);
60+
61+
FeatureRow offendingRow = FeatureRow.newBuilder()
62+
.setFeatureSet("feature_set")
63+
.setEventTimestamp(Timestamp.newBuilder().setSeconds(10))
64+
.addFields(Field.newBuilder().setName("entity_id_primary")
65+
.setValue(Value.newBuilder().setInt32Val(1)))
66+
.addFields(Field.newBuilder().setName("entity_id_primary")
67+
.setValue(Value.newBuilder().setInt32Val(2)))
68+
.addFields(Field.newBuilder().setName("entity_id_secondary")
69+
.setValue(Value.newBuilder().setStringVal("a")))
70+
.build();
71+
72+
PCollection<RedisMutation> output = p
73+
.apply(Create.of(Collections.singletonList(offendingRow)))
74+
.setCoder(ProtoCoder.of(FeatureRow.class))
75+
.apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSetSpecs)));
76+
77+
RedisKey expectedKey = RedisKey.newBuilder()
78+
.setFeatureSet("feature_set")
79+
.addEntities(Field.newBuilder().setName("entity_id_primary")
80+
.setValue(Value.newBuilder().setInt32Val(1)))
81+
.addEntities(Field.newBuilder().setName("entity_id_secondary")
82+
.setValue(Value.newBuilder().setStringVal("a")))
83+
.build();
84+
85+
PAssert.that(output).satisfies((SerializableFunction<Iterable<RedisMutation>, Void>) input -> {
86+
input.forEach(rm -> {
87+
assert(Arrays.equals(rm.getKey(), expectedKey.toByteArray()));
88+
assert(Arrays.equals(rm.getValue(), offendingRow.toByteArray()));
89+
});
90+
return null;
91+
});
92+
p.run();
93+
}
94+
95+
@Test
96+
public void shouldConvertRowWithOutOfOrderEntitiesToValidKey() {
97+
Map<String, FeatureSetSpec> featureSetSpecs = new HashMap<>();
98+
featureSetSpecs.put("feature_set", fs);
99+
100+
FeatureRow offendingRow = FeatureRow.newBuilder()
101+
.setFeatureSet("feature_set")
102+
.setEventTimestamp(Timestamp.newBuilder().setSeconds(10))
103+
.addFields(Field.newBuilder().setName("entity_id_secondary")
104+
.setValue(Value.newBuilder().setStringVal("a")))
105+
.addFields(Field.newBuilder().setName("entity_id_primary")
106+
.setValue(Value.newBuilder().setInt32Val(1)))
107+
.build();
108+
109+
PCollection<RedisMutation> output = p
110+
.apply(Create.of(Collections.singletonList(offendingRow)))
111+
.setCoder(ProtoCoder.of(FeatureRow.class))
112+
.apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSetSpecs)));
113+
114+
RedisKey expectedKey = RedisKey.newBuilder()
115+
.setFeatureSet("feature_set")
116+
.addEntities(Field.newBuilder().setName("entity_id_primary")
117+
.setValue(Value.newBuilder().setInt32Val(1)))
118+
.addEntities(Field.newBuilder().setName("entity_id_secondary")
119+
.setValue(Value.newBuilder().setStringVal("a")))
120+
.build();
121+
122+
PAssert.that(output).satisfies((SerializableFunction<Iterable<RedisMutation>, Void>) input -> {
123+
input.forEach(rm -> {
124+
assert(Arrays.equals(rm.getKey(), expectedKey.toByteArray()));
125+
assert(Arrays.equals(rm.getValue(), offendingRow.toByteArray()));
126+
});
127+
return null;
128+
});
129+
p.run();
130+
}
131+
132+
}

0 commit comments

Comments
 (0)