1919import static feast .proto .types .ValueProto .Value .ValCase .*;
2020import static feast .storage .connectors .bigquery .common .TypeUtil .*;
2121
22+ import com .google .common .collect .ImmutableList ;
23+ import com .google .protobuf .Timestamp ;
2224import feast .proto .types .FeatureRowProto ;
2325import feast .proto .types .FieldProto ;
2426import feast .proto .types .ValueProto ;
4042 * <p>getFeatureRows provides reverse transformation
4143 */
4244public class FeatureRowsBatch implements Serializable {
45+ public static final ImmutableList <String > SERVICE_FIELDS =
46+ ImmutableList .of ("eventTimestamp" , "ingestionId" );
4347 private final Schema schema ;
4448 private String featureSetReference ;
4549 private List <Object > values = new ArrayList <>();
@@ -118,6 +122,12 @@ private Schema inferCommonSchema(Iterable<FeatureRowProto.FeatureRow> featureRow
118122 featureSetReference = row .getFeatureSet ();
119123 }
120124 }));
125+
126+ fieldsInOrder .add (
127+ Schema .Field .of ("eventTimestamp" , Schema .FieldType .array (Schema .FieldType .INT64 )));
128+ fieldsInOrder .add (
129+ Schema .Field .of ("ingestionId" , Schema .FieldType .array (Schema .FieldType .STRING )));
130+
121131 Schema schema = Schema .builder ().addFields (fieldsInOrder ).build ();
122132 schema .setUUID (UUID .randomUUID ());
123133 return schema ;
@@ -132,16 +142,33 @@ private void initValues() {
132142 }
133143
134144 private void toColumnar (Iterable <FeatureRowProto .FeatureRow > featureRows ) {
145+ int timestampColumnIdx = schema .indexOf ("eventTimestamp" );
146+ int ingestionIdColumnIdx = schema .indexOf ("ingestionId" );
147+
135148 featureRows .forEach (
136149 row -> {
137- Map <String , ValueProto .Value > rowValues =
138- row .getFieldsList ().stream ()
139- .collect (Collectors .toMap (FieldProto .Field ::getName , FieldProto .Field ::getValue ));
150+ Map <String , ValueProto .Value > rowValues ;
151+ try {
152+ rowValues =
153+ row .getFieldsList ().stream ()
154+ .collect (
155+ Collectors .toMap (FieldProto .Field ::getName , FieldProto .Field ::getValue ));
156+ } catch (IllegalStateException e ) {
157+ // row contains feature duplicates
158+ // omitting for now
159+ return ;
160+ }
140161
141- IntStream .range (0 , schema .getFieldCount ())
162+ schema
163+ .getFieldNames ()
142164 .forEach (
143- idx -> {
144- Schema .Field field = schema .getField (idx );
165+ fieldName -> {
166+ if (SERVICE_FIELDS .contains (fieldName )) {
167+ return ;
168+ }
169+ Schema .Field field = schema .getField (fieldName );
170+ int idx = schema .indexOf (fieldName );
171+
145172 if (rowValues .containsKey (field .getName ())) {
146173 Object o = protoValueToObject (rowValues .get (field .getName ()));
147174 if (o != null ) {
@@ -152,6 +179,10 @@ private void toColumnar(Iterable<FeatureRowProto.FeatureRow> featureRows) {
152179
153180 ((List <Object >) values .get (idx )).add (defaultValues .get (field .getName ()));
154181 });
182+
183+ // adding service fields
184+ ((List <Object >) values .get (timestampColumnIdx )).add (row .getEventTimestamp ().getSeconds ());
185+ ((List <Object >) values .get (ingestionIdColumnIdx )).add (row .getIngestionId ());
155186 });
156187 }
157188
@@ -177,27 +208,45 @@ public static FeatureRowsBatch fromRow(Row row) {
177208 }
178209
179210 public Iterator <FeatureRowProto .FeatureRow > getFeatureRows () {
211+ int timestampColumnIdx = schema .indexOf ("eventTimestamp" );
212+ int ingestionIdColumnIdx = schema .indexOf ("ingestionId" );
213+
180214 return IntStream .range (0 , ((List <Object >) values .get (0 )).size ())
181215 .parallel ()
182216 .mapToObj (
183217 rowIdx ->
184218 FeatureRowProto .FeatureRow .newBuilder ()
185219 .setFeatureSet (getFeatureSetReference ())
220+ .setEventTimestamp (
221+ Timestamp .newBuilder ()
222+ .setSeconds (
223+ (long )
224+ (((List <Object >) values .get (timestampColumnIdx )).get (rowIdx )))
225+ .build ())
226+ .setIngestionId (
227+ (String ) (((List <Object >) values .get (ingestionIdColumnIdx )).get (rowIdx )))
186228 .addAllFields (
187- IntStream .range (0 , schema .getFieldCount ())
188- .mapToObj (
189- fieldIdx ->
190- FieldProto .Field .newBuilder ()
191- .setName (schema .getField (fieldIdx ).getName ())
192- .setValue (
193- objectToProtoValue (
194- ((List <Object >) values .get (fieldIdx )).get (rowIdx ),
195- schemaToProtoTypes .get (
196- schema
197- .getField (fieldIdx )
198- .getType ()
199- .getCollectionElementType ())))
200- .build ())
229+ schema .getFieldNames ().stream ()
230+ .map (
231+ fieldName -> {
232+ if (SERVICE_FIELDS .contains (fieldName )) {
233+ return null ;
234+ }
235+ int fieldIdx = schema .indexOf (fieldName );
236+
237+ return FieldProto .Field .newBuilder ()
238+ .setName (schema .getField (fieldIdx ).getName ())
239+ .setValue (
240+ objectToProtoValue (
241+ ((List <Object >) values .get (fieldIdx )).get (rowIdx ),
242+ schemaToProtoTypes .get (
243+ schema
244+ .getField (fieldIdx )
245+ .getType ()
246+ .getCollectionElementType ())))
247+ .build ();
248+ })
249+ .filter (Objects ::nonNull )
201250 .collect (Collectors .toList ()))
202251 .build ())
203252 .iterator ();
0 commit comments