1+ package feast .store .serving .redis ;
2+
3+ import static org .junit .Assert .*;
4+
5+ import com .google .protobuf .Timestamp ;
6+ import feast .core .FeatureSetProto .EntitySpec ;
7+ import feast .core .FeatureSetProto .FeatureSetSpec ;
8+ import feast .core .FeatureSetProto .FeatureSpec ;
9+ import feast .ingestion .transform .ValidateFeatureRows ;
10+ import feast .storage .RedisProto .RedisKey ;
11+ import feast .store .serving .redis .RedisCustomIO .Method ;
12+ import feast .store .serving .redis .RedisCustomIO .RedisMutation ;
13+ import feast .test .TestUtil ;
14+ import feast .types .FeatureRowProto .FeatureRow ;
15+ import feast .types .FieldProto .Field ;
16+ import feast .types .ValueProto .Value ;
17+ import feast .types .ValueProto .ValueType .Enum ;
18+ import java .util .Arrays ;
19+ import java .util .Collections ;
20+ import java .util .HashMap ;
21+ import java .util .Map ;
22+ import org .apache .beam .sdk .extensions .protobuf .ProtoCoder ;
23+ import org .apache .beam .sdk .testing .PAssert ;
24+ import org .apache .beam .sdk .testing .TestPipeline ;
25+ import org .apache .beam .sdk .transforms .Create ;
26+ import org .apache .beam .sdk .transforms .ParDo ;
27+ import org .apache .beam .sdk .transforms .SerializableFunction ;
28+ import org .apache .beam .sdk .values .PCollection ;
29+ import org .junit .Rule ;
30+ import org .junit .Test ;
31+
32+ public class FeatureRowToRedisMutationDoFnTest {
33+
34+ @ Rule
35+ public transient TestPipeline p = TestPipeline .create ();
36+
37+ private FeatureSetSpec fs = FeatureSetSpec .newBuilder ()
38+ .setName ("feature_set" )
39+ .setVersion (1 )
40+ .addEntities (
41+ EntitySpec .newBuilder ()
42+ .setName ("entity_id_primary" )
43+ .setValueType (Enum .INT32 )
44+ .build ())
45+ .addEntities (
46+ EntitySpec .newBuilder ()
47+ .setName ("entity_id_secondary" )
48+ .setValueType (Enum .STRING )
49+ .build ())
50+ .addFeatures (
51+ FeatureSpec .newBuilder ().setName ("feature_1" ).setValueType (Enum .STRING ).build ())
52+ .addFeatures (
53+ FeatureSpec .newBuilder ().setName ("feature_2" ).setValueType (Enum .INT64 ).build ())
54+ .build ();
55+
56+ @ Test
57+ public void shouldConvertRowWithDuplicateEntitiesToValidKey () {
58+ Map <String , FeatureSetSpec > featureSetSpecs = new HashMap <>();
59+ featureSetSpecs .put ("feature_set" , fs );
60+
61+ FeatureRow offendingRow = FeatureRow .newBuilder ()
62+ .setFeatureSet ("feature_set" )
63+ .setEventTimestamp (Timestamp .newBuilder ().setSeconds (10 ))
64+ .addFields (Field .newBuilder ().setName ("entity_id_primary" )
65+ .setValue (Value .newBuilder ().setInt32Val (1 )))
66+ .addFields (Field .newBuilder ().setName ("entity_id_primary" )
67+ .setValue (Value .newBuilder ().setInt32Val (2 )))
68+ .addFields (Field .newBuilder ().setName ("entity_id_secondary" )
69+ .setValue (Value .newBuilder ().setStringVal ("a" )))
70+ .build ();
71+
72+ PCollection <RedisMutation > output = p
73+ .apply (Create .of (Collections .singletonList (offendingRow )))
74+ .setCoder (ProtoCoder .of (FeatureRow .class ))
75+ .apply (ParDo .of (new FeatureRowToRedisMutationDoFn (featureSetSpecs )));
76+
77+ RedisKey expectedKey = RedisKey .newBuilder ()
78+ .setFeatureSet ("feature_set" )
79+ .addEntities (Field .newBuilder ().setName ("entity_id_primary" )
80+ .setValue (Value .newBuilder ().setInt32Val (1 )))
81+ .addEntities (Field .newBuilder ().setName ("entity_id_secondary" )
82+ .setValue (Value .newBuilder ().setStringVal ("a" )))
83+ .build ();
84+
85+ PAssert .that (output ).satisfies ((SerializableFunction <Iterable <RedisMutation >, Void >) input -> {
86+ input .forEach (rm -> {
87+ assert (Arrays .equals (rm .getKey (), expectedKey .toByteArray ()));
88+ assert (Arrays .equals (rm .getValue (), offendingRow .toByteArray ()));
89+ });
90+ return null ;
91+ });
92+ p .run ();
93+ }
94+
95+ @ Test
96+ public void shouldConvertRowWithOutOfOrderEntitiesToValidKey () {
97+ Map <String , FeatureSetSpec > featureSetSpecs = new HashMap <>();
98+ featureSetSpecs .put ("feature_set" , fs );
99+
100+ FeatureRow offendingRow = FeatureRow .newBuilder ()
101+ .setFeatureSet ("feature_set" )
102+ .setEventTimestamp (Timestamp .newBuilder ().setSeconds (10 ))
103+ .addFields (Field .newBuilder ().setName ("entity_id_secondary" )
104+ .setValue (Value .newBuilder ().setStringVal ("a" )))
105+ .addFields (Field .newBuilder ().setName ("entity_id_primary" )
106+ .setValue (Value .newBuilder ().setInt32Val (1 )))
107+ .build ();
108+
109+ PCollection <RedisMutation > output = p
110+ .apply (Create .of (Collections .singletonList (offendingRow )))
111+ .setCoder (ProtoCoder .of (FeatureRow .class ))
112+ .apply (ParDo .of (new FeatureRowToRedisMutationDoFn (featureSetSpecs )));
113+
114+ RedisKey expectedKey = RedisKey .newBuilder ()
115+ .setFeatureSet ("feature_set" )
116+ .addEntities (Field .newBuilder ().setName ("entity_id_primary" )
117+ .setValue (Value .newBuilder ().setInt32Val (1 )))
118+ .addEntities (Field .newBuilder ().setName ("entity_id_secondary" )
119+ .setValue (Value .newBuilder ().setStringVal ("a" )))
120+ .build ();
121+
122+ PAssert .that (output ).satisfies ((SerializableFunction <Iterable <RedisMutation >, Void >) input -> {
123+ input .forEach (rm -> {
124+ assert (Arrays .equals (rm .getKey (), expectedKey .toByteArray ()));
125+ assert (Arrays .equals (rm .getValue (), offendingRow .toByteArray ()));
126+ });
127+ return null ;
128+ });
129+ p .run ();
130+ }
131+
132+ }
0 commit comments